use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant, SystemTime};
use anyhow::Result;
use futures_util::StreamExt;
use futures_util::stream::FuturesUnordered;
use serde_json::json;
use tokio::sync::{Mutex as AsyncMutex, RwLock, mpsc};
use tokio_util::sync::CancellationToken;
use crate::client::DeepSeekClient;
use crate::compaction::{
CompactionConfig, compact_messages_safe, merge_system_prompts, should_compact,
};
use crate::config::{ApiProvider, Config, DEFAULT_MAX_SUBAGENTS, DEFAULT_TEXT_MODEL};
use crate::error_taxonomy::{ErrorCategory, ErrorEnvelope, StreamError};
use crate::features::{Feature, Features};
use crate::llm_client::LlmClient;
use crate::mcp::McpPool;
#[cfg(test)]
use crate::models::ToolCaller;
use crate::models::{
ContentBlock, ContentBlockStart, Delta, LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS, Message,
MessageRequest, StreamEvent, SystemPrompt, Tool, Usage,
};
use crate::prompts;
use crate::purge::{emit_purge_completed, emit_purge_failed, emit_purge_started, run_purge};
use crate::seam_manager::{SeamConfig, SeamManager};
use crate::tools::goal::{GoalSnapshot, GoalStatus, SharedGoalState, new_shared_goal_state};
use crate::tools::plan::{PlanSnapshot, SharedPlanState, new_shared_plan_state};
use crate::tools::shell::{SharedShellManager, new_shared_shell_manager};
use crate::tools::spec::RuntimeToolServices;
use crate::tools::spec::{ApprovalRequirement, ToolError, ToolResult};
use crate::tools::subagent::{
Mailbox, SharedSubAgentManager, SubAgentCompletion, SubAgentForkContext, SubAgentResult,
SubAgentRuntime, SubAgentStatus, SubAgentType, new_shared_subagent_manager_with_timeout,
resolve_subagent_assignment_route,
};
use crate::tools::todo::{SharedTodoList, TodoListSnapshot, new_shared_todo_list};
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
use crate::tools::{ToolContext, ToolRegistryBuilder};
use crate::tui::app::AppMode;
use crate::utils::spawn_supervised;
use crate::working_set::WorkingSet;
use super::capacity::{
CapacityController, CapacityControllerConfig, CapacityDecision, CapacityObservationInput,
CapacitySnapshot, GuardrailAction, RiskBand,
};
use super::capacity_memory::{
CanonicalState, CapacityMemoryRecord, ReplayInfo, append_capacity_record,
load_last_k_capacity_records, new_record_id, now_rfc3339,
};
use super::coherence::{CoherenceSignal, CoherenceState, next_coherence_state};
use super::events::{Event, TurnOutcomeStatus};
use super::ops::{Op, USER_SHELL_TOOL_ID_PREFIX};
use super::session::Session;
use super::tool_parser;
use super::turn::{TurnContext, TurnToolCall, post_turn_snapshot, pre_turn_snapshot};
#[derive(Debug, Clone, Default)]
struct StructuredState {
mode_label: String,
workspace: PathBuf,
cwd: Option<PathBuf>,
working_set_summary: Option<String>,
todo_snapshot: Option<TodoListSnapshot>,
plan_snapshot: Option<PlanSnapshot>,
subagent_snapshots: Vec<SubAgentResult>,
}
impl StructuredState {
async fn capture(
mode_label: impl Into<String>,
workspace: PathBuf,
cwd: Option<PathBuf>,
working_set: &WorkingSet,
todos: &SharedTodoList,
plan_state: &SharedPlanState,
subagents: Option<&SharedSubAgentManager>,
) -> Self {
let working_set_summary = working_set.summary_block(&workspace);
let todo_snapshot = {
let guard = todos.lock().await;
let snap = guard.snapshot();
if snap.items.is_empty() {
None
} else {
Some(snap)
}
};
let plan_snapshot = {
let guard = plan_state.lock().await;
if guard.is_empty() {
None
} else {
Some(guard.snapshot())
}
};
let subagent_snapshots = if let Some(handle) = subagents {
let guard = handle.read().await;
guard
.list()
.into_iter()
.filter(|s| matches!(s.status, SubAgentStatus::Running))
.collect()
} else {
Vec::new()
};
Self {
mode_label: mode_label.into(),
workspace,
cwd,
working_set_summary,
todo_snapshot,
plan_snapshot,
subagent_snapshots,
}
}
#[must_use]
fn to_system_block(&self) -> Option<String> {
let mut out = String::new();
out.push_str("## Fork State\n\n");
out.push_str(&format!("- Mode: `{}`\n", self.mode_label));
out.push_str(&format!("- Workspace: `{}`\n", self.workspace.display()));
if let Some(cwd) = self.cwd.as_ref() {
out.push_str(&format!("- Cwd: `{}`\n", cwd.display()));
}
if self.todo_snapshot.is_some() || self.plan_snapshot.is_some() {
out.push_str("\n### Work\n");
}
if let Some(todos) = self.todo_snapshot.as_ref() {
out.push_str(&format!(
"\nChecklist ({}% complete)\n",
todos.completion_pct
));
for item in &todos.items {
let marker = match item.status {
crate::tools::todo::TodoStatus::Pending => "[ ]",
crate::tools::todo::TodoStatus::InProgress => "[~]",
crate::tools::todo::TodoStatus::Completed => "[x]",
};
out.push_str(&format!("- {marker} {}\n", item.content));
}
}
if let Some(plan) = self.plan_snapshot.as_ref() {
out.push_str("\nStrategy metadata\n");
append_plan_field(&mut out, "Title", plan.title.as_deref());
append_plan_field(&mut out, "Objective", plan.objective.as_deref());
append_plan_field(&mut out, "Context", plan.context_summary.as_deref());
append_plan_field(&mut out, "Explanation", plan.explanation.as_deref());
append_plan_list(&mut out, "Source", &plan.sources_used);
append_plan_list(&mut out, "Critical file", &plan.critical_files);
append_plan_list(&mut out, "Constraint", &plan.constraints);
append_plan_field(
&mut out,
"Recommended approach",
plan.recommended_approach.as_deref(),
);
append_plan_field(
&mut out,
"Verification plan",
plan.verification_plan.as_deref(),
);
append_plan_field(
&mut out,
"Risks and unknowns",
plan.risks_and_unknowns.as_deref(),
);
append_plan_field(&mut out, "Handoff packet", plan.handoff_packet.as_deref());
for item in &plan.items {
let marker = match item.status {
crate::tools::plan::StepStatus::Pending => "[ ]",
crate::tools::plan::StepStatus::InProgress => "[~]",
crate::tools::plan::StepStatus::Completed => "[x]",
};
out.push_str(&format!("- {marker} {}\n", item.step));
}
}
if !self.subagent_snapshots.is_empty() {
out.push_str("\n### Open Sub-Agents\n");
for s in &self.subagent_snapshots {
let role = s.assignment.role.as_deref().unwrap_or("-");
let goal = if s.assignment.objective.is_empty() {
"(no objective set)"
} else {
s.assignment.objective.as_str()
};
out.push_str(&format!("- `{}` (role: {}) - {}\n", s.agent_id, role, goal));
}
}
if let Some(working_set) = self.working_set_summary.as_deref() {
out.push('\n');
out.push_str(working_set);
out.push('\n');
}
Some(out)
}
}
fn append_plan_field(out: &mut String, label: &str, value: Option<&str>) {
if let Some(value) = value.map(str::trim).filter(|value| !value.is_empty()) {
out.push_str(&format!("- {label}: {value}\n"));
}
}
fn append_plan_list(out: &mut String, label: &str, values: &[String]) {
for value in values {
let value = value.trim();
if !value.is_empty() {
out.push_str(&format!("- {label}: {value}\n"));
}
}
}
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub model: String,
pub workspace: PathBuf,
pub allow_shell: bool,
pub trust_mode: bool,
pub notes_path: PathBuf,
pub mcp_config_path: PathBuf,
pub skills_dir: PathBuf,
pub instructions: Vec<crate::prompts::InstructionSource>,
pub project_context_pack_enabled: bool,
pub translation_enabled: bool,
pub show_thinking: bool,
pub verbosity: Option<String>,
pub max_steps: u32,
pub max_subagents: usize,
pub interactive_launch_limit: usize,
pub features: Features,
pub compaction: CompactionConfig,
pub capacity: CapacityControllerConfig,
pub todos: SharedTodoList,
pub plan_state: SharedPlanState,
pub goal_state: SharedGoalState,
pub max_spawn_depth: u32,
pub network_policy: Option<crate::network_policy::NetworkPolicyDecider>,
pub snapshots_enabled: bool,
pub snapshots_max_workspace_bytes: u64,
pub lsp_config: Option<crate::lsp::LspConfig>,
pub runtime_services: RuntimeToolServices,
pub subagent_model_overrides: HashMap<String, String>,
pub memory_enabled: bool,
pub memory_path: PathBuf,
pub speech_output_dir: Option<PathBuf>,
pub vision_config: Option<crate::config::VisionModelConfig>,
pub goal_objective: Option<String>,
pub goal_token_budget: Option<u32>,
pub goal_status: GoalStatus,
pub allowed_tools: Option<Vec<String>>,
pub disallowed_tools: Option<Vec<String>>,
pub hook_executor: Option<std::sync::Arc<crate::hooks::HookExecutor>>,
pub locale_tag: String,
pub strict_tool_mode: bool,
pub workshop: Option<crate::tools::large_output_router::WorkshopConfig>,
pub search_provider: crate::config::SearchProvider,
pub search_api_key: Option<String>,
pub search_base_url: Option<String>,
pub subagent_api_timeout: Duration,
pub stream_chunk_timeout: Duration,
pub subagent_heartbeat_timeout: Duration,
pub tools_always_load: HashSet<String>,
#[allow(dead_code)] pub prefer_bwrap: bool,
pub tools: Option<crate::config::ToolsConfig>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
model: DEFAULT_TEXT_MODEL.to_string(),
workspace: PathBuf::from("."),
allow_shell: true,
trust_mode: false,
notes_path: PathBuf::from("notes.txt"),
mcp_config_path: PathBuf::from("mcp.json"),
skills_dir: crate::skills::default_skills_dir(),
instructions: Vec::new(),
project_context_pack_enabled: true,
translation_enabled: false,
show_thinking: true,
max_steps: 100,
max_subagents: DEFAULT_MAX_SUBAGENTS,
interactive_launch_limit: crate::config::DEFAULT_INTERACTIVE_LAUNCH_LIMIT,
features: Features::with_defaults(),
compaction: CompactionConfig::default(),
capacity: CapacityControllerConfig::default(),
todos: new_shared_todo_list(),
plan_state: new_shared_plan_state(),
goal_state: new_shared_goal_state(),
max_spawn_depth: crate::tools::subagent::DEFAULT_MAX_SPAWN_DEPTH,
network_policy: None,
snapshots_enabled: true,
snapshots_max_workspace_bytes:
crate::snapshot::DEFAULT_MAX_WORKSPACE_BYTES_FOR_SNAPSHOT,
lsp_config: None,
runtime_services: RuntimeToolServices::default(),
subagent_model_overrides: HashMap::new(),
memory_enabled: false,
memory_path: PathBuf::from("./memory.md"),
speech_output_dir: None,
vision_config: None,
strict_tool_mode: false,
goal_objective: None,
goal_token_budget: None,
goal_status: GoalStatus::Active,
allowed_tools: None,
disallowed_tools: None,
hook_executor: None,
locale_tag: "en".to_string(),
workshop: None,
search_provider: crate::config::SearchProvider::default(),
search_api_key: None,
search_base_url: None,
subagent_api_timeout: Duration::from_secs(
crate::config::DEFAULT_SUBAGENT_API_TIMEOUT_SECS,
),
stream_chunk_timeout: Duration::from_secs(
crate::config::DEFAULT_STREAM_CHUNK_TIMEOUT_SECS,
),
subagent_heartbeat_timeout: Duration::from_secs(
crate::config::DEFAULT_SUBAGENT_HEARTBEAT_TIMEOUT_SECS,
),
tools_always_load: HashSet::new(),
prefer_bwrap: false,
verbosity: None,
tools: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
pub enum CancelReason {
User,
External,
Preempted,
Internal,
}
impl CancelReason {
fn describe(self) -> &'static str {
match self {
Self::User => "user cancelled the request",
Self::External => "request cancelled by external caller",
Self::Preempted => "request was preempted by a new turn",
Self::Internal => "engine torn down before approval resolved",
}
}
}
#[derive(Clone)]
pub struct EngineHandle {
pub tx_op: mpsc::Sender<Op>,
pub rx_event: Arc<RwLock<mpsc::Receiver<Event>>>,
cancel_token: Arc<StdMutex<CancellationToken>>,
cancel_reason: Arc<StdMutex<Option<CancelReason>>>,
tx_approval: mpsc::Sender<ApprovalDecision>,
tx_user_input: mpsc::Sender<UserInputDecision>,
tx_steer: mpsc::Sender<String>,
shared_paused: Arc<StdMutex<bool>>,
}
pub struct Engine {
config: EngineConfig,
deepseek_client: Option<DeepSeekClient>,
deepseek_client_error: Option<String>,
api_key_env_only_recovery: Option<String>,
session: Session,
subagent_manager: SharedSubAgentManager,
shell_manager: SharedShellManager,
mcp_pool: Option<Arc<AsyncMutex<McpPool>>>,
api_provider: ApiProvider,
rx_op: mpsc::Receiver<Op>,
rx_approval: mpsc::Receiver<ApprovalDecision>,
rx_user_input: mpsc::Receiver<UserInputDecision>,
rx_steer: mpsc::Receiver<String>,
tx_event: mpsc::Sender<Event>,
tx_subagent_completion: mpsc::UnboundedSender<SubAgentCompletion>,
pub(super) rx_subagent_completion: mpsc::UnboundedReceiver<SubAgentCompletion>,
cancel_token: CancellationToken,
shared_cancel_token: Arc<StdMutex<CancellationToken>>,
pub(super) cancel_reason: Arc<StdMutex<Option<CancelReason>>>,
tool_exec_lock: Arc<RwLock<()>>,
capacity_controller: CapacityController,
seam_manager: Option<SeamManager>,
coherence_state: CoherenceState,
turn_counter: u64,
lsp_manager: Arc<crate::lsp::LspManager>,
workshop_vars: Option<
std::sync::Arc<tokio::sync::Mutex<crate::tools::large_output_router::WorkshopVariables>>,
>,
sandbox_backend: Option<std::sync::Arc<dyn crate::sandbox::backend::SandboxBackend>>,
pending_lsp_blocks: Vec<crate::lsp::DiagnosticBlock>,
slop_ledger_gate_cache: Option<(Option<SystemTime>, Option<String>)>,
current_mode: AppMode,
token_estimate_cache: TokenEstimateCache,
shared_paused: Arc<StdMutex<bool>>,
}
impl Engine {
fn reset_cancel_token(&mut self) {
let token = CancellationToken::new();
self.cancel_token = token.clone();
match self.shared_cancel_token.lock() {
Ok(mut shared) => {
*shared = token;
}
Err(poisoned) => {
*poisoned.into_inner() = token;
}
}
match self.cancel_reason.lock() {
Ok(mut slot) => *slot = None,
Err(poisoned) => *poisoned.into_inner() = None,
}
match self.shared_paused.lock() {
Ok(mut paused) => *paused = false,
Err(poisoned) => *poisoned.into_inner() = false,
}
}
fn env_only_api_key_recovery_hint(api_config: &Config) -> Option<String> {
if !crate::config::active_provider_uses_env_only_api_key(api_config) {
return None;
}
let provider = api_config.api_provider();
let env_var = match provider {
ApiProvider::Deepseek | ApiProvider::DeepseekCN => "DEEPSEEK_API_KEY",
ApiProvider::NvidiaNim => "NVIDIA_API_KEY/NVIDIA_NIM_API_KEY",
ApiProvider::Openai => "OPENAI_API_KEY",
ApiProvider::Zai => "ZAI_API_KEY/Z_AI_API_KEY",
ApiProvider::Stepfun => "STEPFUN_API_KEY/STEP_API_KEY",
ApiProvider::Anthropic => "ANTHROPIC_API_KEY",
ApiProvider::Atlascloud => "ATLASCLOUD_API_KEY",
ApiProvider::WanjieArk => "WANJIE_ARK_API_KEY/WANJIE_API_KEY/WANJIE_MAAS_API_KEY",
ApiProvider::Volcengine => "VOLCENGINE_API_KEY/VOLCENGINE_ARK_API_KEY/ARK_API_KEY",
ApiProvider::Openrouter => "OPENROUTER_API_KEY",
ApiProvider::XiaomiMimo => "XIAOMI_MIMO_API_KEY/XIAOMI_API_KEY/MIMO_API_KEY",
ApiProvider::Novita => "NOVITA_API_KEY",
ApiProvider::Fireworks => "FIREWORKS_API_KEY",
ApiProvider::Siliconflow | ApiProvider::SiliconflowCn => "SILICONFLOW_API_KEY",
ApiProvider::Arcee => "ARCEE_API_KEY",
ApiProvider::Moonshot => "MOONSHOT_API_KEY/KIMI_API_KEY",
ApiProvider::Sglang => "SGLANG_API_KEY",
ApiProvider::Vllm => "VLLM_API_KEY",
ApiProvider::Ollama => "OLLAMA_API_KEY",
ApiProvider::Huggingface => "HUGGINGFACE_API_KEY/HF_TOKEN",
ApiProvider::Together => "TOGETHER_API_KEY",
ApiProvider::OpenaiCodex => "OPENAI_CODEX_ACCESS_TOKEN/CODEX_ACCESS_TOKEN",
ApiProvider::Minimax => "MINIMAX_API_KEY",
};
Some(format!(
"The rejected key came from {env_var}; no saved config key is present.\n\
Run `codewhale auth status` to inspect credential sources, then \
`codewhale auth set --provider {provider}` to save a valid key in ~/.codewhale/config.toml, \
or remove the stale export and open a fresh shell.",
provider = provider.as_str()
))
}
pub(super) fn decorate_auth_error_message(&self, message: String) -> String {
let Some(hint) = self.api_key_env_only_recovery.as_ref() else {
return message;
};
if crate::error_taxonomy::classify_error_message(&message) != ErrorCategory::Authentication
|| message.contains("no saved config key is present")
{
return message;
}
format!("{message}\n\n{hint}")
}
pub fn new(config: EngineConfig, api_config: &Config) -> (Self, EngineHandle) {
crate::tls::ensure_rustls_crypto_provider();
if let Some(objective) = normalized_goal_objective(config.goal_objective.as_deref()) {
sync_goal_state_from_host(
&config.goal_state,
Some(&objective),
config.goal_token_budget,
config.goal_status,
);
}
let (tx_op, rx_op) = mpsc::channel(32);
let (tx_event, rx_event) = mpsc::channel(256);
let (tx_approval, rx_approval) = mpsc::channel(64);
let (tx_user_input, rx_user_input) = mpsc::channel(32);
let (tx_steer, rx_steer) = mpsc::channel(64);
let (tx_subagent_completion, rx_subagent_completion) = mpsc::unbounded_channel();
let cancel_token = CancellationToken::new();
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
let cancel_reason: Arc<StdMutex<Option<CancelReason>>> = Arc::new(StdMutex::new(None));
let shared_paused = Arc::new(StdMutex::new(false));
let tool_exec_lock = Arc::new(RwLock::new(()));
let (deepseek_client, deepseek_client_error) = match DeepSeekClient::new(api_config) {
Ok(client) => (Some(client), None),
Err(err) => (None, Some(err.to_string())),
};
let api_provider = api_config.api_provider();
let api_key_env_only_recovery = Self::env_only_api_key_recovery_hint(api_config);
let mut session = Session::new(
config.model.clone(),
config.workspace.clone(),
config.allow_shell,
config.trust_mode,
config.notes_path.clone(),
config.mcp_config_path.clone(),
);
let user_memory_block =
crate::memory::compose_block(config.memory_enabled, &config.memory_path);
let prompt_goal_objective =
goal_objective_for_prompt(config.goal_objective.as_deref(), &config.goal_state);
let system_prompt =
prompts::system_prompt_for_mode_with_context_skills_session_and_approval(
&config.workspace,
None,
Some(&config.skills_dir),
Some(&config.instructions),
prompts::PromptSessionContext {
user_memory_block: user_memory_block.as_deref(),
goal_objective: prompt_goal_objective.as_deref(),
project_context_pack_enabled: config.project_context_pack_enabled,
locale_tag: &config.locale_tag,
translation_enabled: config.translation_enabled,
model_id: &config.model,
context_window_override: Some(
crate::config::provider_capability(api_provider, &config.model)
.context_window,
),
show_thinking: config.show_thinking,
verbosity: config.verbosity.as_deref(),
},
);
let stable_prompt = Some(system_prompt);
session.last_system_prompt_hash = Some(system_prompt_hash(stable_prompt.as_ref()));
session.system_prompt = stable_prompt;
let _ = session.prefix_stability.get_or_insert_with(|| {
crate::prefix_cache::PrefixStabilityManager::new_unpinned()
});
let subagent_manager = new_shared_subagent_manager_with_timeout(
config.workspace.clone(),
config.max_subagents,
config.subagent_heartbeat_timeout,
config.interactive_launch_limit,
);
let shell_manager = config
.runtime_services
.shell_manager
.clone()
.unwrap_or_else(|| new_shared_shell_manager(config.workspace.clone()));
let capacity_controller = CapacityController::new(config.capacity.clone());
let seam_manager = deepseek_client.as_ref().map(|main_client| {
let seam_config = SeamConfig {
enabled: api_config.context.enabled.unwrap_or(false),
verbatim_window_turns: api_config
.context
.verbatim_window_turns
.unwrap_or(crate::seam_manager::VERBATIM_WINDOW_TURNS),
l1_threshold: api_config
.context
.l1_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L1_THRESHOLD),
l2_threshold: api_config
.context
.l2_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L2_THRESHOLD),
l3_threshold: api_config
.context
.l3_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L3_THRESHOLD),
seam_model: api_config
.context
.seam_model
.clone()
.unwrap_or_else(|| crate::seam_manager::DEFAULT_SEAM_MODEL.to_string()),
};
SeamManager::new(main_client.clone(), seam_config)
});
let lsp_manager = Arc::new(match config.lsp_config.clone() {
Some(cfg) => crate::lsp::LspManager::new(cfg, config.workspace.clone()),
None => crate::lsp::LspManager::disabled(),
});
let workshop_vars: Option<
std::sync::Arc<
tokio::sync::Mutex<crate::tools::large_output_router::WorkshopVariables>,
>,
> = if config.workshop.is_some() {
Some(std::sync::Arc::new(tokio::sync::Mutex::new(
crate::tools::large_output_router::WorkshopVariables::default(),
)))
} else {
None
};
let sandbox_backend = crate::sandbox::backend::create_backend(api_config)
.unwrap_or_else(|e| {
tracing::warn!("Failed to create sandbox backend: {e}");
None
})
.map(std::sync::Arc::from);
let mut engine = Engine {
config,
deepseek_client,
deepseek_client_error,
api_key_env_only_recovery,
session,
subagent_manager,
shell_manager,
mcp_pool: None,
api_provider,
rx_op,
rx_approval,
rx_user_input,
rx_steer,
tx_event,
tx_subagent_completion,
rx_subagent_completion,
cancel_token: cancel_token.clone(),
shared_cancel_token: shared_cancel_token.clone(),
cancel_reason: cancel_reason.clone(),
tool_exec_lock,
capacity_controller,
seam_manager,
coherence_state: CoherenceState::default(),
turn_counter: 0,
lsp_manager,
pending_lsp_blocks: Vec::new(),
slop_ledger_gate_cache: None,
workshop_vars,
sandbox_backend,
current_mode: AppMode::Agent,
token_estimate_cache: TokenEstimateCache::new(),
shared_paused: shared_paused.clone(),
};
engine.rehydrate_latest_canonical_state();
let handle = EngineHandle {
tx_op,
rx_event: Arc::new(RwLock::new(rx_event)),
cancel_token: shared_cancel_token,
cancel_reason,
tx_approval,
tx_user_input,
tx_steer,
shared_paused,
};
(engine, handle)
}
async fn handle_run_shell_command(
&mut self,
command: String,
mode: AppMode,
trust_mode: bool,
auto_approve: bool,
approval_mode: crate::tui::approval::ApprovalMode,
) {
self.reset_cancel_token();
self.turn_counter = self.turn_counter.saturating_add(1);
self.capacity_controller.mark_turn_start(self.turn_counter);
let turn_id = format!(
"{}{seq}",
USER_SHELL_TOOL_ID_PREFIX,
seq = self.turn_counter
);
let tool_id = turn_id.clone();
let tool_name = "exec_shell".to_string();
let tool_input = json!({ "command": command, "source": "user" });
let snapshot_prompt = tool_input["command"]
.as_str()
.unwrap_or_default()
.to_string();
self.session.trust_mode = trust_mode;
self.config.trust_mode = trust_mode;
self.session.auto_approve = auto_approve;
let agent_approval_mode = agent_approval_mode_for_turn(auto_approve, approval_mode);
if mode == AppMode::Agent {
self.session.approval_mode = agent_approval_mode;
}
let _ = self
.tx_event
.send(Event::TurnStarted {
turn_id: turn_id.clone(),
})
.await;
if self.config.snapshots_enabled {
let pre_workspace = self.session.workspace.clone();
let pre_seq = self.turn_counter;
let pre_cap = self.config.snapshots_max_workspace_bytes;
let pre_prompt = snapshot_prompt.clone();
let _ = tokio::task::spawn_blocking(move || {
pre_turn_snapshot(&pre_workspace, pre_seq, pre_cap, Some(&pre_prompt))
})
.await;
}
let _ = self
.tx_event
.send(Event::ToolCallStarted {
id: tool_id.clone(),
name: tool_name.clone(),
input: tool_input.clone(),
})
.await;
let tool_context = self.build_tool_context(mode, auto_approve);
let registry = ToolRegistryBuilder::new()
.with_shell_tools()
.build(tool_context);
let result = if mode == AppMode::Plan {
Err(ToolError::permission_denied(
"Tool 'exec_shell' is unavailable in Plan mode".to_string(),
))
} else if !self.config.features.enabled(Feature::ShellTool) {
Err(ToolError::not_available(
"Tool 'exec_shell' is disabled by feature flag".to_string(),
))
} else if let Some(spec) = registry.get(&tool_name) {
let approval_required = spec.approval_requirement() != ApprovalRequirement::Auto
&& !registry.context().auto_approve;
if approval_required {
emit_tool_audit(json!({
"event": "tool.approval_required",
"tool_id": tool_id.clone(),
"tool_name": tool_name.clone(),
"source": "composer_bang",
}));
let approval_key =
crate::tools::approval_cache::build_approval_key(&tool_name, &tool_input).0;
let approval_grouping_key =
crate::tools::approval_cache::build_approval_grouping_key(
&tool_name,
&tool_input,
)
.0;
let _ = self
.tx_event
.send(Event::ApprovalRequired {
id: tool_id.clone(),
tool_name: tool_name.clone(),
input: tool_input.clone(),
description: spec.description().to_string(),
approval_key,
approval_grouping_key,
intent_summary: None,
})
.await;
match self.await_tool_approval(&tool_id).await {
Ok(ApprovalResult::Approved) => {
emit_tool_audit(json!({
"event": "tool.approval_decision",
"tool_id": tool_id.clone(),
"tool_name": tool_name.clone(),
"decision": "approved",
"source": "composer_bang",
}));
Self::execute_tool_with_lock(
self.tool_exec_lock.clone(),
spec.supports_parallel(),
false,
self.tx_event.clone(),
tool_name.clone(),
tool_input.clone(),
Some(®istry),
None,
None,
)
.await
}
Ok(ApprovalResult::Denied) => {
emit_tool_audit(json!({
"event": "tool.approval_decision",
"tool_id": tool_id.clone(),
"tool_name": tool_name.clone(),
"decision": "denied",
"source": "composer_bang",
}));
Err(ToolError::permission_denied(format!(
"Tool '{tool_name}' denied by user"
)))
}
Ok(ApprovalResult::RetryWithPolicy(policy)) => {
emit_tool_audit(json!({
"event": "tool.approval_decision",
"tool_id": tool_id.clone(),
"tool_name": tool_name.clone(),
"decision": "retry_with_policy",
"policy": format!("{policy:?}"),
"source": "composer_bang",
}));
let elevated_context = registry
.context()
.clone()
.with_elevated_sandbox_policy(policy);
Self::execute_tool_with_lock(
self.tool_exec_lock.clone(),
spec.supports_parallel(),
false,
self.tx_event.clone(),
tool_name.clone(),
tool_input.clone(),
Some(®istry),
None,
Some(elevated_context),
)
.await
}
Err(err) => Err(err),
}
} else {
Self::execute_tool_with_lock(
self.tool_exec_lock.clone(),
spec.supports_parallel(),
false,
self.tx_event.clone(),
tool_name.clone(),
tool_input.clone(),
Some(®istry),
None,
None,
)
.await
}
} else {
Err(ToolError::not_available(
"tool 'exec_shell' is not registered".to_string(),
))
};
let mut result = result;
if let Ok(tool_result) = result.as_mut()
&& let Some(path) = crate::tools::truncate::apply_spillover_with_artifact(
tool_result,
&tool_id,
&tool_name,
&self.session.id,
)
{
emit_tool_audit(json!({
"event": "tool.spillover",
"tool_id": tool_id.clone(),
"tool_name": tool_name.clone(),
"path": path.display().to_string(),
"source": "composer_bang",
}));
}
let status = if result.is_err() {
TurnOutcomeStatus::Failed
} else {
TurnOutcomeStatus::Completed
};
let error = result.as_ref().err().map(ToString::to_string);
let _ = self
.tx_event
.send(Event::ToolCallComplete {
id: tool_id,
name: tool_name,
result,
})
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: Usage::default(),
status,
error,
tool_catalog: None,
base_url: None,
})
.await;
if self.config.snapshots_enabled {
let post_workspace = self.session.workspace.clone();
let post_seq = self.turn_counter;
let post_cap = self.config.snapshots_max_workspace_bytes;
crate::utils::spawn_blocking_supervised("post-shell-turn-snapshot", move || {
post_turn_snapshot(&post_workspace, post_seq, post_cap, Some(&snapshot_prompt));
});
}
}
#[allow(clippy::too_many_lines)]
pub async fn run(mut self) {
while let Some(op) = self.rx_op.recv().await {
match op {
Op::SendMessage {
content,
mode,
model,
goal_objective,
goal_token_budget,
goal_status,
reasoning_effort,
reasoning_effort_auto,
auto_model,
allow_shell,
trust_mode,
auto_approve,
approval_mode,
translation_enabled,
show_thinking,
allowed_tools,
hook_executor,
verbosity,
} => {
self.handle_send_message(
content,
mode,
model,
goal_objective,
goal_token_budget,
goal_status,
reasoning_effort,
reasoning_effort_auto,
auto_model,
allow_shell,
trust_mode,
auto_approve,
approval_mode,
translation_enabled,
show_thinking,
allowed_tools,
hook_executor,
verbosity,
)
.await;
}
Op::RunShellCommand {
command,
mode,
trust_mode,
auto_approve,
approval_mode,
} => {
self.handle_run_shell_command(
command,
mode,
trust_mode,
auto_approve,
approval_mode,
)
.await;
}
Op::CancelRequest => {
self.cancel_token.cancel();
self.reset_cancel_token();
}
Op::ApproveToolCall { id } => {
let _ = self
.tx_event
.send(Event::status(format!("Approved tool call: {id}")))
.await;
}
Op::DenyToolCall { id } => {
let _ = self
.tx_event
.send(Event::status(format!("Denied tool call: {id}")))
.await;
}
Op::SpawnSubAgent { prompt } => {
let Some(client) = self.deepseek_client.clone() else {
let message = self
.deepseek_client_error
.as_deref()
.map(|err| format!("Failed to spawn sub-agent: {err}"))
.unwrap_or_else(|| {
"Failed to spawn sub-agent: API client not configured".to_string()
});
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal(message)))
.await;
continue;
};
let mcp_pool = if self.config.features.enabled(Feature::Mcp) {
self.ensure_mcp_pool().await.ok()
} else {
None
};
let mut runtime = SubAgentRuntime::new(
client,
self.session.model.clone(),
self.build_tool_context(AppMode::Agent, self.session.auto_approve),
self.session.allow_shell,
Some(self.tx_event.clone()),
Arc::clone(&self.subagent_manager),
)
.with_role_models(self.config.subagent_model_overrides.clone())
.with_auto_model(self.session.auto_model)
.with_reasoning_effort(
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
)
.with_max_spawn_depth(self.config.max_spawn_depth)
.with_step_api_timeout(self.config.subagent_api_timeout)
.with_speech_output_dir(self.config.speech_output_dir.clone())
.with_mcp_pool(mcp_pool)
.background_runtime();
let route = resolve_subagent_assignment_route(
&runtime,
None,
&prompt,
&SubAgentType::General,
)
.await;
runtime.model = route.model;
runtime.reasoning_effort = route.reasoning_effort;
runtime.reasoning_effort_auto = false;
let result = {
let mut manager = self.subagent_manager.write().await;
manager.spawn_background(
Arc::clone(&self.subagent_manager),
runtime,
SubAgentType::General,
prompt.clone(),
None,
)
};
match result {
Ok(snapshot) => {
let _ = self
.tx_event
.send(Event::status(format!(
"Spawned sub-agent {}",
snapshot.agent_id
)))
.await;
}
Err(err) => {
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal(format!(
"Failed to spawn sub-agent: {err}"
))))
.await;
}
}
}
Op::ListSubAgents => {
let agents = {
let mut manager = self.subagent_manager.write().await;
manager.cleanup(Duration::from_secs(60 * 60));
manager.list()
};
let _ = self.tx_event.send(Event::AgentList { agents }).await;
}
Op::ChangeMode { mode } => {
self.current_mode = mode;
self.emit_session_updated().await;
let _ = self
.tx_event
.send(Event::status(format!(
"Mode changed to: {}",
mode.description()
)))
.await;
}
Op::SetModel { model, mode: _ } => {
self.session.auto_model = model.trim().eq_ignore_ascii_case("auto");
self.session.model = model;
self.config.model.clone_from(&self.session.model);
self.refresh_system_prompt();
self.emit_session_updated().await;
let _ = self
.tx_event
.send(Event::status(format!(
"Model set to: {}",
self.session.model
)))
.await;
}
Op::SetCompaction { config } => {
let enabled = config.enabled;
self.config.compaction = config;
let _ = self
.tx_event
.send(Event::status(format!(
"Auto-compaction {}",
if enabled { "enabled" } else { "disabled" }
)))
.await;
}
Op::SetStreamChunkTimeout { timeout_secs } => {
self.config.stream_chunk_timeout = Duration::from_secs(timeout_secs);
let _ = self
.tx_event
.send(Event::status(format!(
"Stream chunk timeout set to {timeout_secs}s"
)))
.await;
}
Op::SyncSession {
session_id,
messages,
system_prompt,
system_prompt_override,
model,
workspace,
} => {
if let Some(session_id) = session_id {
self.session.id = session_id;
} else if messages.is_empty() && system_prompt.is_none() {
self.session.id = uuid::Uuid::new_v4().to_string();
}
self.session.messages = messages.into();
self.session.compaction_summary_prompt =
extract_compaction_summary_prompt(system_prompt.clone());
self.session.system_prompt = system_prompt;
self.session.last_system_prompt_hash =
Some(system_prompt_hash(self.session.system_prompt.as_ref()));
self.session.system_prompt_override =
system_prompt_override && self.session.system_prompt.is_some();
self.session.auto_model = model.trim().eq_ignore_ascii_case("auto");
self.session.model = model;
self.session.workspace = workspace.clone();
self.config.model.clone_from(&self.session.model);
self.config.workspace = workspace.clone();
let ctx = crate::project_context::load_project_context_with_parents(&workspace);
self.session.project_context = if ctx.has_instructions() {
Some(ctx)
} else {
None
};
self.session.rebuild_working_set();
self.rehydrate_latest_canonical_state();
self.emit_session_updated().await;
let _ = self
.tx_event
.send(Event::status("Session context synced".to_string()))
.await;
}
Op::CompactContext => {
self.handle_manual_compaction().await;
}
Op::PurgeContext => {
self.handle_purge().await;
}
Op::EditLastTurn { new_message } => {
let mut cut = None;
for (idx, msg) in self.session.messages.iter().enumerate().rev() {
if msg.role == "user" {
cut = Some(idx);
break;
}
}
if let Some(idx) = cut {
self.session.messages.truncate_to(idx);
self.session.bump_messages_revision();
}
let mode = AppMode::Agent; self.handle_send_message(
new_message,
mode,
self.session.model.clone(),
self.config.goal_objective.clone(),
self.config.goal_token_budget,
self.config.goal_status,
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
self.session.auto_model,
self.session.allow_shell,
self.session.trust_mode,
self.session.auto_approve,
self.session.approval_mode,
self.config.translation_enabled,
self.config.show_thinking,
self.config.allowed_tools.clone(),
self.config.hook_executor.clone(),
self.config.verbosity.clone(),
)
.await;
}
Op::Shutdown => {
break;
}
}
}
if let Some(pool) = self.mcp_pool.as_ref() {
let mut guard = pool.lock().await;
guard.shutdown_all().await;
}
}
async fn emit_session_updated(&self) {
let _ = self
.tx_event
.send(Event::SessionUpdated {
session_id: self.session.id.clone(),
messages: self.session.messages.clone().into(),
system_prompt: self.session.system_prompt.clone(),
model: self.session.model.clone(),
workspace: self.session.workspace.clone(),
})
.await;
}
fn goal_snapshot_for_event(&self) -> Option<GoalSnapshot> {
match self.config.goal_state.lock() {
Ok(state) => {
let snapshot = state.snapshot();
snapshot.objective.is_some().then_some(snapshot)
}
Err(err) => {
tracing::warn!("goal state lock poisoned while emitting goal update: {err}");
None
}
}
}
async fn emit_goal_updated(&self) {
if let Some(snapshot) = self.goal_snapshot_for_event() {
let _ = self.tx_event.send(Event::GoalUpdated { snapshot }).await;
}
}
async fn add_session_message(&mut self, message: Message) {
self.session.add_message(message);
self.emit_session_updated().await;
}
fn turn_metadata_block(
&self,
routed_model: &str,
auto_model: bool,
reasoning_effort: Option<&str>,
reasoning_effort_auto: bool,
) -> ContentBlock {
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
let working_set_summary = self
.session
.working_set
.summary_block(&self.config.workspace)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
let mut lines = vec![
format!("Current local date: {today}"),
format!("Current model: {routed_model}"),
];
if auto_model {
lines.push(format!("Auto model route: {routed_model}"));
}
if reasoning_effort_auto && let Some(reasoning_effort) = reasoning_effort {
lines.push(format!("Auto reasoning effort: {reasoning_effort}"));
}
if let Some(working_set_summary) = working_set_summary {
lines.push(working_set_summary);
}
let summary = lines.join("\n");
ContentBlock::Text {
text: format!("<turn_meta>\n{summary}\n</turn_meta>"),
cache_control: None,
}
}
fn runtime_prompt_message(&self) -> Message {
let mode = self.current_mode;
let approval_mode = approval_mode_for(mode, self.session.approval_mode);
Message {
role: "user".to_string(),
content: vec![ContentBlock::Text {
text: runtime_prompt_text(mode, approval_mode, self.session.allow_shell),
cache_control: None,
}],
}
}
fn user_text_message_with_turn_metadata(&self, text: String) -> Message {
self.user_text_message_with_turn_metadata_for_route(
text,
&self.session.model,
self.session.auto_model,
self.session.reasoning_effort.as_deref(),
self.session.reasoning_effort_auto,
)
}
fn user_text_message_with_turn_metadata_for_route(
&self,
text: String,
routed_model: &str,
auto_model: bool,
reasoning_effort: Option<&str>,
reasoning_effort_auto: bool,
) -> Message {
Message {
role: "user".to_string(),
content: vec![
ContentBlock::Text {
text,
cache_control: None,
},
self.turn_metadata_block(
routed_model,
auto_model,
reasoning_effort,
reasoning_effort_auto,
),
],
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_send_message(
&mut self,
content: String,
mode: AppMode,
model: String,
goal_objective: Option<String>,
goal_token_budget: Option<u32>,
goal_status: GoalStatus,
reasoning_effort: Option<String>,
reasoning_effort_auto: bool,
auto_model: bool,
allow_shell: bool,
trust_mode: bool,
auto_approve: bool,
approval_mode: crate::tui::approval::ApprovalMode,
translation_enabled: bool,
show_thinking: bool,
allowed_tools: Option<Vec<String>>,
hook_executor: Option<std::sync::Arc<crate::hooks::HookExecutor>>,
verbosity: Option<String>,
) {
self.reset_cancel_token();
self.current_mode = mode;
while self.rx_steer.try_recv().is_ok() {}
let mut turn = TurnContext::new(self.config.max_steps);
self.turn_counter = self.turn_counter.saturating_add(1);
self.capacity_controller.mark_turn_start(self.turn_counter);
let _ = self
.tx_event
.send(Event::TurnStarted {
turn_id: turn.id.clone(),
})
.await;
if self.config.snapshots_enabled {
let snapshot_prompt = content.clone();
let pre_workspace = self.session.workspace.clone();
let pre_seq = self.turn_counter;
let pre_cap = self.config.snapshots_max_workspace_bytes;
let _ = tokio::task::spawn_blocking(move || {
pre_turn_snapshot(&pre_workspace, pre_seq, pre_cap, Some(&snapshot_prompt))
})
.await;
}
crate::retry_status::clear();
let snapshot_prompt_post = content.clone();
if self.deepseek_client.is_none() {
let message = self
.deepseek_client_error
.as_deref()
.map(|err| format!("Failed to send message: {err}"))
.unwrap_or_else(|| "Failed to send message: API client not configured".to_string());
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(message.clone())))
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: turn.usage.clone(),
status: TurnOutcomeStatus::Failed,
error: Some(message),
tool_catalog: None,
base_url: None,
})
.await;
return;
}
self.session
.working_set
.observe_user_message(&content, &self.session.workspace);
let force_update_plan_first = should_force_update_plan_first(mode, &content);
let agent_approval_mode = agent_approval_mode_for_turn(auto_approve, approval_mode);
self.session.auto_approve = auto_approve;
if mode == AppMode::Agent {
self.session.approval_mode = agent_approval_mode;
}
let user_msg = self.user_text_message_with_turn_metadata_for_route(
content,
&model,
auto_model,
reasoning_effort.as_deref(),
reasoning_effort_auto,
);
self.session.add_message(user_msg);
let previous_goal_objective = self.config.goal_objective.clone();
let previous_goal_token_budget = self.config.goal_token_budget;
let previous_goal_status = self.config.goal_status;
self.session.model = model;
self.config.model.clone_from(&self.session.model);
self.config.goal_objective = goal_objective.clone();
self.config.goal_token_budget = goal_token_budget;
self.config.goal_status = goal_status;
if normalized_goal_objective(previous_goal_objective.as_deref())
!= normalized_goal_objective(goal_objective.as_deref())
|| previous_goal_token_budget != goal_token_budget
|| previous_goal_status != goal_status
{
sync_goal_state_from_host(
&self.config.goal_state,
normalized_goal_objective(goal_objective.as_deref()).as_deref(),
goal_token_budget,
goal_status,
);
}
self.config.allowed_tools = allowed_tools;
self.config.hook_executor = hook_executor;
self.session.reasoning_effort = reasoning_effort;
self.session.reasoning_effort_auto = reasoning_effort_auto;
self.session.auto_model = auto_model;
self.session.allow_shell = allow_shell;
self.config.allow_shell = allow_shell;
self.session.trust_mode = trust_mode;
self.config.trust_mode = trust_mode;
self.config.translation_enabled = translation_enabled;
self.config.show_thinking = show_thinking;
self.config.verbosity = verbosity;
self.refresh_system_prompt();
self.emit_session_updated().await;
let todo_list = self.config.todos.clone();
let plan_state = self.config.plan_state.clone();
let tool_context = self.build_tool_context(mode, auto_approve);
let builder = self.build_turn_tool_registry_builder(mode, todo_list, plan_state);
let fork_context_for_runtime = if self.config.features.enabled(Feature::Subagents) {
let state = StructuredState::capture(
mode.label(),
self.config.workspace.clone(),
std::env::current_dir().ok(),
&self.session.working_set,
&self.config.todos,
&self.config.plan_state,
Some(&self.subagent_manager),
)
.await;
Some(SubAgentForkContext {
system: self.session.system_prompt.clone(),
messages: self.messages_with_turn_metadata(),
structured_state_block: state.to_system_block(),
})
} else {
None
};
let mailbox_for_runtime = if self.config.features.enabled(Feature::Subagents) {
let cancel_token = self.cancel_token.child_token();
let (mailbox, mut receiver) = Mailbox::new(cancel_token.clone());
let tx_event_clone = self.tx_event.clone();
spawn_supervised(
"subagent-mailbox-drainer",
std::panic::Location::caller(),
async move {
while let Some(envelope) = receiver.recv().await {
if tx_event_clone
.send(Event::SubAgentMailbox {
seq: envelope.seq,
message: envelope.message,
})
.await
.is_err()
{
break;
}
}
},
);
Some((mailbox, cancel_token))
} else {
None
};
let mcp_pool = if self.config.features.enabled(Feature::Mcp) {
self.ensure_mcp_pool().await.ok()
} else {
None
};
let mut tool_registry = match mode {
AppMode::Agent | AppMode::Yolo => {
if self.config.features.enabled(Feature::Subagents) {
let runtime = if let Some(client) = self.deepseek_client.clone() {
let mut rt = SubAgentRuntime::new(
client,
self.session.model.clone(),
tool_context.clone(),
self.session.allow_shell,
Some(self.tx_event.clone()),
Arc::clone(&self.subagent_manager),
)
.with_role_models(self.config.subagent_model_overrides.clone())
.with_auto_model(self.session.auto_model)
.with_reasoning_effort(
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
)
.with_max_spawn_depth(self.config.max_spawn_depth)
.with_step_api_timeout(self.config.subagent_api_timeout)
.with_speech_output_dir(self.config.speech_output_dir.clone())
.with_mcp_pool(mcp_pool.clone())
.with_parent_completion_tx(self.tx_subagent_completion.clone());
if let Some(context) = fork_context_for_runtime.clone() {
rt = rt.with_fork_context(context);
}
if let Some((mailbox, cancel_token)) = mailbox_for_runtime.as_ref() {
rt = rt
.with_mailbox(mailbox.clone())
.with_cancel_token(cancel_token.clone());
}
Some(rt)
} else {
None
};
if let Some(subagent_runtime) = runtime {
Some(
builder
.with_subagent_tools(
self.subagent_manager.clone(),
subagent_runtime,
)
.build(tool_context),
)
} else {
tracing::warn!(
"Sub-agents enabled but no API client available, falling back to basic tool set"
);
Some(builder.build(tool_context))
}
} else {
Some(builder.build(tool_context))
}
}
_ => Some(builder.build(tool_context)),
};
let mut plugin_tool_names: std::collections::HashSet<String> =
std::collections::HashSet::new();
if let Some(ref mut tool_registry) = tool_registry {
plugin_tool_names = configure_plugin_tools(tool_registry, self.config.tools.as_ref());
}
let mcp_tools = if self.config.features.enabled(Feature::Mcp) {
self.mcp_tools().await
} else {
Vec::new()
};
let tools = tool_registry.as_ref().map(|registry| {
let mut catalog = build_model_tool_catalog(
registry.to_api_tools_with_cache(true),
mcp_tools,
mode,
&self.config.tools_always_load,
);
for tool in &mut catalog {
if plugin_tool_names.contains(&tool.name) {
tool.defer_loading = Some(false);
}
}
filter_tool_catalog_for_gates(
&mut catalog,
self.config.allowed_tools.as_deref(),
self.config.disallowed_tools.as_deref(),
);
catalog
});
let tool_catalog_for_event = tools.clone();
let base_url_for_event = self
.deepseek_client
.as_ref()
.map(|client| client.base_url().to_string());
use futures_util::FutureExt as _;
let turn_result = std::panic::AssertUnwindSafe(self.handle_deepseek_turn(
&mut turn,
tool_registry.as_ref(),
tools,
mode,
force_update_plan_first,
))
.catch_unwind()
.await;
let (status, error) = match turn_result {
Ok(outcome) => outcome,
Err(panic) => {
let detail = crate::utils::panic_message(&*panic);
crate::utils::record_caught_panic("engine-event-loop", &detail);
(
TurnOutcomeStatus::Failed,
Some(format!(
"The engine hit an internal error and stopped this turn: {detail}. \
Your session is intact — send your message again to retry. \
A crash report was saved to ~/.codewhale/crashes/."
)),
)
}
};
self.session.total_usage.add(&turn.usage);
self.emit_goal_updated().await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: turn.usage,
status,
error,
tool_catalog: tool_catalog_for_event,
base_url: base_url_for_event,
})
.await;
if self.config.snapshots_enabled {
let post_workspace = self.session.workspace.clone();
let post_seq = self.turn_counter;
let post_cap = self.config.snapshots_max_workspace_bytes;
crate::utils::spawn_blocking_supervised("post-turn-snapshot", move || {
post_turn_snapshot(
&post_workspace,
post_seq,
post_cap,
Some(&snapshot_prompt_post),
);
});
}
}
async fn handle_manual_compaction(&mut self) {
let id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]);
let zero_usage = Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
};
let Some(client) = self.deepseek_client.clone() else {
let message = "Manual compaction unavailable: API client not configured".to_string();
self.emit_compaction_failed(id, false, message.clone())
.await;
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(message.clone())))
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status: TurnOutcomeStatus::Failed,
error: Some(message),
tool_catalog: None,
base_url: None,
})
.await;
return;
};
let start_message = "Manual context compaction started".to_string();
self.emit_compaction_started(id.clone(), false, start_message)
.await;
let compaction_pins = self
.session
.working_set
.pinned_message_indices(&self.session.messages, &self.session.workspace);
let compaction_paths = self.session.working_set.top_paths(24);
let messages_before = self.session.messages.len();
let mut turn_status = TurnOutcomeStatus::Completed;
let mut turn_error = None;
match compact_messages_safe(
&client,
&self.session.messages,
&self.config.compaction,
Some(&self.session.workspace),
Some(&compaction_pins),
Some(&compaction_paths),
)
.await
{
Ok(result) => {
if !result.messages.is_empty() || self.session.messages.is_empty() {
let messages_after = result.messages.len();
self.session.messages = result.messages.into();
self.merge_compaction_summary(result.summary_prompt);
self.emit_session_updated().await;
let removed = messages_before.saturating_sub(messages_after);
let message = if result.retries_used > 0 {
format!(
"Compaction complete: {messages_before} → {messages_after} messages ({removed} removed, {} retries)",
result.retries_used
)
} else {
format!(
"Compaction complete: {messages_before} → {messages_after} messages ({removed} removed)"
)
};
self.emit_compaction_completed(
id,
false,
message,
Some(messages_before),
Some(messages_after),
)
.await;
} else {
let message = "Compaction skipped: produced empty result".to_string();
self.emit_compaction_failed(id, false, message.clone())
.await;
turn_status = TurnOutcomeStatus::Failed;
turn_error = Some(message);
}
}
Err(err) => {
let message = format!("Manual context compaction failed: {err}");
self.emit_compaction_failed(id, false, message.clone())
.await;
let _ = self.tx_event.send(Event::status(message.clone())).await;
turn_status = TurnOutcomeStatus::Failed;
turn_error = Some(message);
}
}
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status: turn_status,
error: turn_error,
tool_catalog: None,
base_url: None,
})
.await;
}
async fn handle_purge(&mut self) {
let zero_usage = Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
};
let Some(client) = self.deepseek_client.clone() else {
let message = "Purge unavailable: API client not configured".to_string();
emit_purge_failed(&self.tx_event, message.clone()).await;
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(message.clone())))
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status: TurnOutcomeStatus::Failed,
error: Some(message),
tool_catalog: None,
base_url: None,
})
.await;
return;
};
emit_purge_started(
&self.tx_event,
"Agent context purge in progress\u{2026}".to_string(),
)
.await;
let messages_before = self.session.messages.len();
let (status, error) = match run_purge(
&client,
&self.session.messages,
&self.session.model,
self.session.reasoning_effort.clone(),
effective_max_output_tokens(&self.session.model),
)
.await
{
Ok(result) => {
let messages_after = result.messages.len();
self.session.messages = result.messages.into();
self.emit_session_updated().await;
let summary = format!(
"Purge complete: {messages_before} → {messages_after} messages \
({} removed, {} condensed)",
result.removed_count, result.replaced_count,
);
emit_purge_completed(
&self.tx_event,
messages_before,
messages_after,
result.removed_count,
result.replaced_count,
summary,
)
.await;
(TurnOutcomeStatus::Completed, None)
}
Err(e) => {
emit_purge_failed(&self.tx_event, e.clone()).await;
(TurnOutcomeStatus::Failed, Some(e))
}
};
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status,
error,
tool_catalog: None,
base_url: None,
})
.await;
}
fn estimated_input_tokens(&mut self) -> usize {
self.token_estimate_cache.lookup_or_compute(
self.session.messages_revision,
self.session.system_prompt.as_ref(),
&self.session.messages,
)
}
fn trim_oldest_messages_to_budget(&mut self, target_input_budget: usize) -> usize {
let mut removed = 0usize;
while self.session.messages.len() > MIN_RECENT_MESSAGES_TO_KEEP
&& self.estimated_input_tokens() > target_input_budget
{
self.session.messages.trim_front(1);
self.session.bump_messages_revision();
removed = removed.saturating_add(1);
}
removed
}
async fn recover_context_overflow(&mut self, client: &DeepSeekClient, reason: &str) -> bool {
let Some(target_budget) =
context_input_budget_for_provider(self.api_provider, &self.session.model)
else {
return false;
};
let id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]);
let start_message = format!("Emergency context compaction started ({reason})");
self.emit_compaction_started(id.clone(), true, start_message)
.await;
let before_tokens = self.estimated_input_tokens();
let before_count = self.session.messages.len();
let mut retries_used = 0u32;
let mut summary_prompt = None;
let mut compacted_messages: Vec<Message> = self.session.messages.clone().into();
let mut forced_config = self.config.compaction.clone();
forced_config.enabled = true;
forced_config.token_threshold = forced_config
.token_threshold
.min(target_budget.saturating_sub(1))
.max(1);
match compact_messages_safe(
client,
&self.session.messages,
&forced_config,
Some(&self.session.workspace),
None,
None,
)
.await
{
Ok(result) => {
retries_used = result.retries_used;
compacted_messages = result.messages;
summary_prompt = result.summary_prompt;
}
Err(err) => {
let _ = self
.tx_event
.send(Event::status(format!(
"Emergency compaction API pass failed: {err}. Falling back to local trim."
)))
.await;
}
}
if !compacted_messages.is_empty() || self.session.messages.is_empty() {
self.session.messages = compacted_messages.into();
}
self.merge_compaction_summary(summary_prompt);
let trimmed = self.trim_oldest_messages_to_budget(target_budget);
self.emit_session_updated().await;
let after_tokens = self.estimated_input_tokens();
let after_count = self.session.messages.len();
let recovered = after_tokens <= target_budget
&& (after_tokens < before_tokens || after_count < before_count || trimmed > 0);
if recovered {
let removed = before_count.saturating_sub(after_count);
let mut details = format!(
"Emergency compaction complete: {before_count} → {after_count} messages ({removed} removed), ~{before_tokens} → ~{after_tokens} tokens"
);
if retries_used > 0 {
details.push_str(&format!(" ({retries_used} retries)"));
}
if trimmed > 0 {
details.push_str(&format!(", trimmed {trimmed} oldest"));
}
self.emit_compaction_completed(
id,
true,
details.clone(),
Some(before_count),
Some(after_count),
)
.await;
let _ = self.tx_event.send(Event::status(details)).await;
return true;
}
let message = format!(
"Emergency context compaction failed to reduce request below model limit \
(estimate ~{after_tokens} tokens, budget ~{target_budget})."
);
self.emit_compaction_failed(id, true, message.clone()).await;
let _ = self.tx_event.send(Event::status(message)).await;
false
}
fn build_tool_context(&self, mode: AppMode, auto_approve: bool) -> ToolContext {
let trusted = crate::workspace_trust::WorkspaceTrust::load_for(&self.session.workspace);
let mut trusted_external_paths = trusted.paths().to_vec();
let clipboard_images_dir =
crate::tui::clipboard::clipboard_images_dir(&self.session.workspace);
if !trusted_external_paths
.iter()
.any(|path| path == &clipboard_images_dir)
{
trusted_external_paths.push(clipboard_images_dir);
}
let mut ctx = ToolContext::with_auto_approve(
self.session.workspace.clone(),
self.session.trust_mode,
self.session.notes_path.clone(),
self.session.mcp_config_path.clone(),
mode == AppMode::Yolo || auto_approve,
)
.with_state_namespace(self.session.id.clone())
.with_features(self.config.features.clone())
.with_shell_manager(self.shell_manager.clone())
.with_runtime_services(self.config.runtime_services.clone())
.with_session_objects(crate::rlm::session::SessionObjectSnapshot::new(
self.session.id.clone(),
self.session.model.clone(),
self.session.workspace.clone(),
self.session.system_prompt.clone(),
self.session.messages.clone().into(),
))
.with_cancel_token(self.cancel_token.clone())
.with_trusted_external_paths(trusted_external_paths);
if self.config.memory_enabled {
ctx.memory_path = Some(self.config.memory_path.clone());
}
if let Some(decider) = self.config.network_policy.as_ref() {
ctx = ctx.with_network_policy(decider.clone());
}
if let Some(workshop_cfg) = self.config.workshop.as_ref()
&& let Some(vars_arc) = self.workshop_vars.as_ref()
{
let router =
crate::tools::large_output_router::LargeOutputRouter::new(workshop_cfg.clone());
ctx = ctx.with_large_output_router(router, vars_arc.clone());
}
if let Some(backend) = self.sandbox_backend.as_ref() {
ctx = ctx.with_sandbox_backend(std::sync::Arc::clone(backend));
}
ctx.search_provider = self.config.search_provider;
ctx.search_api_key = self.config.search_api_key.clone();
ctx.search_base_url = self.config.search_base_url.clone();
let policy = sandbox_policy_for_mode(mode, &self.session.workspace);
let mut ctx = ctx.with_elevated_sandbox_policy(policy);
if matches!(mode, AppMode::Plan) {
ctx = ctx.with_shell_network_denied_hint(
"Shell command blocked: Plan mode runs shell commands in a read-only sandbox — no writes, no network. Use Agent mode (`/mode agent`) for any command that creates or modifies files, or that needs network access.",
);
}
ctx
}
async fn ensure_mcp_pool(&mut self) -> Result<Arc<AsyncMutex<McpPool>>, ToolError> {
if let Some(pool) = self.mcp_pool.as_ref() {
return Ok(Arc::clone(pool));
}
let mut pool = McpPool::from_config_path_with_workspace(
&self.session.mcp_config_path,
&self.session.workspace,
)
.map_err(|e| ToolError::execution_failed(format!("Failed to load MCP config: {e}")))?;
if let Some(decider) = self.config.network_policy.as_ref() {
pool = pool.with_network_policy(decider.clone());
}
let pool = Arc::new(AsyncMutex::new(pool));
self.mcp_pool = Some(Arc::clone(&pool));
Ok(pool)
}
async fn mcp_tools(&mut self) -> Vec<Tool> {
let pool = match self.ensure_mcp_pool().await {
Ok(pool) => pool,
Err(err) => {
let _ = self.tx_event.send(Event::status(format!("{err:#}"))).await;
return Vec::new();
}
};
let mut pool = pool.lock().await;
let errors = pool.connect_all().await;
for (server, err) in errors {
let _ = self
.tx_event
.send(Event::status(format!(
"Failed to connect MCP server '{server}': {err:#}"
)))
.await;
}
pool.to_api_tools()
}
#[allow(clippy::too_many_lines)]
async fn layered_context_checkpoint(&mut self) {
if self.seam_manager.is_none() {
return;
}
if !self.seam_manager.as_ref().unwrap().config().enabled {
return;
}
let estimated_tokens = self.estimated_input_tokens();
let seam_mgr = self.seam_manager.as_ref().unwrap();
let highest = seam_mgr.highest_level().await;
let Some(level) = seam_mgr.seam_level_for(estimated_tokens, highest) else {
return;
};
let msg_count = self.session.messages.len();
let verbatim_start = seam_mgr.verbatim_window_start(msg_count);
if verbatim_start == 0 {
return; }
let msg_range_end = verbatim_start;
let pinned = self
.session
.working_set
.pinned_message_indices(&self.session.messages, &self.session.workspace);
let _ = self
.tx_event
.send(Event::status(format!(
"⏻ producing L{level} context seam ({msg_range_end} messages)…"
)))
.await;
let existing_seams = seam_mgr.collect_seam_texts(&self.session.messages).await;
let seam_text = if existing_seams.is_empty() {
match seam_mgr
.produce_soft_seam(
&self.session.messages,
level,
0,
msg_range_end,
Some(&self.session.workspace),
&pinned,
)
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!("L{level} soft seam failed: {err}"));
return;
}
}
} else {
let recent: Vec<&Message> = (0..msg_range_end)
.filter_map(|i| self.session.messages.get(i))
.collect();
match seam_mgr
.recompact(&existing_seams, &recent, level, 0, msg_range_end)
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!("L{level} recompact failed: {err}"));
return;
}
}
};
if seam_text.is_empty() {
return;
}
let seam_count = seam_mgr.seam_count().await;
self.add_session_message(Message {
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: seam_text,
cache_control: None,
}],
})
.await;
let _ = self
.tx_event
.send(Event::status(format!(
"⏻ L{level} seam complete ({seam_count} total, {msg_range_end} messages covered)"
)))
.await;
}
fn refresh_system_prompt(&mut self) {
let user_memory_block =
crate::memory::compose_block(self.config.memory_enabled, &self.config.memory_path);
let prompt_goal_objective = goal_objective_for_prompt(
self.config.goal_objective.as_deref(),
&self.config.goal_state,
);
let base = prompts::system_prompt_for_mode_with_context_skills_session_and_approval(
&self.config.workspace,
None,
Some(&self.config.skills_dir),
Some(&self.config.instructions),
prompts::PromptSessionContext {
user_memory_block: user_memory_block.as_deref(),
goal_objective: prompt_goal_objective.as_deref(),
project_context_pack_enabled: self.config.project_context_pack_enabled,
locale_tag: &self.config.locale_tag,
translation_enabled: self.config.translation_enabled,
model_id: &self.config.model,
context_window_override: Some(
crate::config::provider_capability(self.api_provider, &self.config.model)
.context_window,
),
show_thinking: self.config.show_thinking,
verbosity: self.config.verbosity.as_deref(),
},
);
let mut stable_prompt =
merge_system_prompts(Some(&base), self.session.compaction_summary_prompt.clone());
let gate_block = self.slop_ledger_gate_block();
if let Some(ref block) = gate_block
&& let Some(SystemPrompt::Text(prompt_text)) = &mut stable_prompt
{
prompt_text.push_str("\n\n");
prompt_text.push_str(block);
}
let stable_hash = system_prompt_hash(stable_prompt.as_ref());
if self.session.system_prompt_override {
return;
}
if self.session.last_system_prompt_hash != Some(stable_hash) {
self.session.system_prompt = stable_prompt;
self.session.last_system_prompt_hash = Some(stable_hash);
}
}
fn slop_ledger_gate_block(&mut self) -> Option<String> {
let modified = crate::slop_ledger::SlopLedger::default_path()
.ok()
.and_then(|path| std::fs::metadata(path).ok())
.and_then(|metadata| metadata.modified().ok());
if let Some((cached_modified, cached_block)) = &self.slop_ledger_gate_cache
&& *cached_modified == modified
{
return cached_block.clone();
}
let loaded = crate::slop_ledger::SlopLedger::load()
.ok()
.and_then(|ledger| {
if ledger.has_open_entries() {
ledger.completion_gate_summary()
} else {
None
}
});
self.slop_ledger_gate_cache = Some((modified, loaded.clone()));
loaded
}
fn merge_compaction_summary(&mut self, summary_prompt: Option<SystemPrompt>) {
if summary_prompt.is_none() {
return;
}
self.session.compaction_summary_prompt = merge_system_prompts(
self.session.compaction_summary_prompt.as_ref(),
summary_prompt.clone(),
);
let merged = merge_system_prompts(self.session.system_prompt.as_ref(), summary_prompt);
self.session.last_system_prompt_hash = Some(system_prompt_hash(merged.as_ref()));
self.session.system_prompt = merged;
}
}
fn default_plugin_tools_dir() -> PathBuf {
codewhale_config::codewhale_home()
.unwrap_or_else(|_| {
dirs::home_dir().map_or_else(|| PathBuf::from(".codewhale"), |h| h.join(".codewhale"))
})
.join("tools")
}
fn plugin_tools_dir(tools_config: Option<&crate::config::ToolsConfig>) -> PathBuf {
if let Some(tools_config) = tools_config
&& let Some(custom_dir) = tools_config.plugin_dir.as_deref()
{
return PathBuf::from(shellexpand::tilde(custom_dir).as_ref());
}
default_plugin_tools_dir()
}
fn configure_plugin_tools(
tool_registry: &mut crate::tools::ToolRegistry,
tools_config: Option<&crate::config::ToolsConfig>,
) -> std::collections::HashSet<String> {
let names_before: std::collections::HashSet<String> = tool_registry
.names()
.into_iter()
.map(|s| s.to_string())
.collect();
let plugin_dir = plugin_tools_dir(tools_config);
tool_registry.load_plugins(&plugin_dir);
if let Some(tools_config) = tools_config
&& let Some(ref overrides) = tools_config.overrides
{
tool_registry.apply_overrides(overrides, &plugin_dir);
}
let names_after: std::collections::HashSet<String> = tool_registry
.names()
.into_iter()
.map(|s| s.to_string())
.collect();
&names_after - &names_before
}
fn system_prompt_hash(prompt: Option<&SystemPrompt>) -> u64 {
let mut hasher = DefaultHasher::new();
match prompt {
Some(SystemPrompt::Text(text)) => {
0u8.hash(&mut hasher);
text.hash(&mut hasher);
}
Some(SystemPrompt::Blocks(blocks)) => {
1u8.hash(&mut hasher);
for block in blocks {
block.block_type.hash(&mut hasher);
block.text.hash(&mut hasher);
if let Some(cache_control) = &block.cache_control {
cache_control.cache_type.hash(&mut hasher);
}
}
}
None => {
2u8.hash(&mut hasher);
}
}
hasher.finish()
}
fn normalized_goal_objective(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn sync_goal_state_from_host(
goal_state: &SharedGoalState,
objective: Option<&str>,
token_budget: Option<u32>,
status: GoalStatus,
) {
match goal_state.lock() {
Ok(mut state) => state.sync_from_host_status(objective, token_budget, status),
Err(err) => tracing::warn!("goal state lock poisoned while syncing host goal: {err}"),
}
}
fn goal_objective_for_prompt(
configured_goal: Option<&str>,
goal_state: &SharedGoalState,
) -> Option<String> {
match goal_state.lock() {
Ok(state) => {
if let Some(objective) = state.objective() {
return state.is_active().then(|| objective.to_string());
}
}
Err(err) => tracing::warn!("goal state lock poisoned while building prompt: {err}"),
}
normalized_goal_objective(configured_goal)
}
fn approval_mode_for(
mode: AppMode,
session_approval: crate::tui::approval::ApprovalMode,
) -> crate::tui::approval::ApprovalMode {
match mode {
AppMode::Yolo => crate::tui::approval::ApprovalMode::Auto,
AppMode::Plan => crate::tui::approval::ApprovalMode::Never,
AppMode::Agent => session_approval,
}
}
fn agent_approval_mode_for_turn(
auto_approve: bool,
approval_mode: crate::tui::approval::ApprovalMode,
) -> crate::tui::approval::ApprovalMode {
if auto_approve {
crate::tui::approval::ApprovalMode::Auto
} else {
approval_mode
}
}
fn runtime_prompt_text(
mode: AppMode,
approval_mode: crate::tui::approval::ApprovalMode,
allow_shell: bool,
) -> String {
let mode_str = match mode {
AppMode::Agent => "agent",
AppMode::Plan => "plan",
AppMode::Yolo => "yolo",
};
let approval_str = match approval_mode {
crate::tui::approval::ApprovalMode::Auto => "auto",
crate::tui::approval::ApprovalMode::Suggest => "suggest",
crate::tui::approval::ApprovalMode::Never => "never",
};
format!(
"<runtime_prompt visibility=\"internal\" mode=\"{mode_str}\" approval=\"{approval_str}\" allow_shell=\"{allow_shell}\"/>"
)
}
pub fn spawn_engine(config: EngineConfig, api_config: &Config) -> EngineHandle {
let (engine, handle) = Engine::new(config, api_config);
spawn_supervised(
"engine-event-loop",
std::panic::Location::caller(),
async move {
engine.run().await;
},
);
handle
}
#[cfg(test)]
pub(crate) struct MockEngineHandle {
pub handle: EngineHandle,
pub rx_op: mpsc::Receiver<Op>,
rx_approval: mpsc::Receiver<ApprovalDecision>,
pub rx_steer: mpsc::Receiver<String>,
pub tx_event: mpsc::Sender<Event>,
pub cancel_token: CancellationToken,
}
#[cfg(test)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum MockApprovalEvent {
Approved {
id: String,
},
Denied {
id: String,
},
RetryWithPolicy {
id: String,
policy: crate::sandbox::SandboxPolicy,
},
}
#[cfg(test)]
impl MockEngineHandle {
pub(crate) async fn recv_approval_event(&mut self) -> Option<MockApprovalEvent> {
match self.rx_approval.recv().await? {
ApprovalDecision::Approved { id } => Some(MockApprovalEvent::Approved { id }),
ApprovalDecision::Denied { id } => Some(MockApprovalEvent::Denied { id }),
ApprovalDecision::RetryWithPolicy { id, policy } => {
Some(MockApprovalEvent::RetryWithPolicy { id, policy })
}
}
}
}
#[cfg(test)]
pub(crate) fn mock_engine_handle() -> MockEngineHandle {
let (tx_op, rx_op) = mpsc::channel(32);
let (tx_event, rx_event) = mpsc::channel(256);
let (tx_approval, rx_approval) = mpsc::channel(64);
let (tx_user_input, _rx_user_input) = mpsc::channel(32);
let (tx_steer, rx_steer) = mpsc::channel(64);
let cancel_token = CancellationToken::new();
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
let cancel_reason: Arc<StdMutex<Option<CancelReason>>> = Arc::new(StdMutex::new(None));
let shared_paused = Arc::new(StdMutex::new(false));
let handle = EngineHandle {
tx_op,
rx_event: Arc::new(RwLock::new(rx_event)),
cancel_token: shared_cancel_token,
cancel_reason,
tx_approval,
tx_user_input,
tx_steer,
shared_paused,
};
MockEngineHandle {
handle,
rx_op,
rx_approval,
rx_steer,
tx_event,
cancel_token,
}
}
mod approval;
mod capacity_flow;
mod context;
mod handle;
pub(crate) use context::compact_tool_result_for_context;
use context::{
COMPACTION_SUMMARY_MARKER, MAX_CONTEXT_RECOVERY_ATTEMPTS, MIN_RECENT_MESSAGES_TO_KEEP,
context_input_budget_for_provider, effective_max_output_tokens,
extract_compaction_summary_prompt, is_context_length_error_message, summarize_text,
};
mod dispatch;
mod loop_guard;
mod lsp_hooks;
mod streaming;
mod token_estimate_cache;
mod tool_catalog;
mod tool_execution;
mod tool_setup;
mod turn_loop;
pub(crate) use token_estimate_cache::TokenEstimateCache;
pub(super) const MAX_PARALLEL_SHELL_EXEC: usize = 4;
pub(crate) fn default_active_native_tool_names() -> &'static [&'static str] {
tool_catalog::DEFAULT_ACTIVE_NATIVE_TOOLS
}
fn filter_tool_catalog_for_gates(
catalog: &mut Vec<Tool>,
allowed_tools: Option<&[String]>,
disallowed_tools: Option<&[String]>,
) {
catalog.retain(|tool| {
!turn_loop::command_denies_tool(disallowed_tools, &tool.name)
&& turn_loop::command_allows_tool(allowed_tools, &tool.name)
});
}
use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision};
#[cfg(test)]
use self::dispatch::should_parallelize_tool_batch;
use self::dispatch::{
ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome,
ToolExecutionBatch, ToolExecutionPlan, caller_allowed_for_tool, caller_type_for_tool_use,
final_tool_input, format_tool_error, mcp_tool_approval_description, mcp_tool_is_parallel_safe,
mcp_tool_is_read_only, parse_parallel_tool_calls, parse_tool_input,
plan_tool_execution_batches, should_force_update_plan_first, should_stop_after_plan_tool,
};
use self::loop_guard::{AttemptDecision, LoopGuard, OutcomeDecision};
#[cfg(test)]
use self::lsp_hooks::edited_paths_for_tool;
#[cfg(test)]
use self::streaming::TOOL_CALL_START_MARKERS;
use self::streaming::{
ContentBlockKind, FAKE_WRAPPER_NOTICE, MAX_STREAM_ERRORS_BEFORE_FAIL, MAX_STREAM_RETRIES,
MAX_TRANSPARENT_STREAM_RETRIES, STREAM_MAX_CONTENT_BYTES, STREAM_MAX_DURATION_SECS,
ToolUseState, contains_fake_tool_wrapper, filter_tool_call_delta, should_resume_after_sleep,
should_transparently_retry_stream, sleep_gap_detected,
};
use self::tool_catalog::{
CODE_EXECUTION_TOOL_NAME, JS_EXECUTION_TOOL_NAME, MULTI_TOOL_PARALLEL_NAME,
REQUEST_USER_INPUT_NAME, active_tools_for_step, apply_provider_tool_policy,
build_model_tool_catalog, ensure_advanced_tooling, execute_code_execution_tool,
execute_tool_search, initial_active_tools, is_tool_search_tool,
maybe_hydrate_requested_deferred_tool, missing_tool_error_message,
};
#[cfg(test)]
use self::tool_catalog::{
TOOL_SEARCH_BM25_NAME, TOOL_SEARCH_REGEX_NAME, maybe_activate_requested_deferred_tool,
preflight_requested_deferred_tool, should_default_defer_tool,
};
use self::tool_execution::emit_tool_audit;
use self::tool_setup::sandbox_policy_for_mode;
use crate::tools::js_execution::execute_js_execution_tool;
#[cfg(test)]
mod tests;