use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use anyhow::Result;
use futures_util::StreamExt;
use futures_util::stream::FuturesUnordered;
use serde_json::json;
use tokio::sync::{Mutex as AsyncMutex, RwLock, mpsc};
use tokio_util::sync::CancellationToken;
use crate::client::DeepSeekClient;
use crate::compaction::{
CompactionConfig, compact_messages_safe, merge_system_prompts, should_compact,
};
use crate::config::{ApiProvider, Config, DEFAULT_MAX_SUBAGENTS, DEFAULT_TEXT_MODEL};
use crate::cycle_manager::{
CycleBriefing, CycleConfig, StructuredState, archive_cycle, build_seed_messages,
estimate_briefing_tokens, produce_briefing, should_advance_cycle,
};
use crate::error_taxonomy::{ErrorCategory, ErrorEnvelope, StreamError};
use crate::features::{Feature, Features};
use crate::llm_client::LlmClient;
use crate::mcp::McpPool;
#[cfg(test)]
use crate::models::ToolCaller;
use crate::models::{
ContentBlock, ContentBlockStart, Delta, LEGACY_DEEPSEEK_CONTEXT_WINDOW_TOKENS, Message,
MessageRequest, StreamEvent, SystemPrompt, Tool, Usage,
};
use crate::prompts;
use crate::seam_manager::{SeamConfig, SeamManager};
use crate::tools::plan::{SharedPlanState, new_shared_plan_state};
use crate::tools::shell::{SharedShellManager, new_shared_shell_manager};
use crate::tools::spec::RuntimeToolServices;
use crate::tools::spec::{ApprovalRequirement, ToolError, ToolResult};
use crate::tools::subagent::{
Mailbox, SharedSubAgentManager, SubAgentCompletion, SubAgentForkContext, SubAgentRuntime,
SubAgentType, new_shared_subagent_manager, resolve_subagent_assignment_route,
};
use crate::tools::todo::{SharedTodoList, new_shared_todo_list};
use crate::tools::user_input::{UserInputRequest, UserInputResponse};
use crate::tools::{ToolContext, ToolRegistryBuilder};
use crate::tui::app::AppMode;
use crate::utils::spawn_supervised;
use super::capacity::{
CapacityController, CapacityControllerConfig, CapacityDecision, CapacityObservationInput,
CapacitySnapshot, GuardrailAction, RiskBand,
};
use super::capacity_memory::{
CanonicalState, CapacityMemoryRecord, ReplayInfo, append_capacity_record,
load_last_k_capacity_records, new_record_id, now_rfc3339,
};
use super::coherence::{CoherenceSignal, CoherenceState, next_coherence_state};
use super::events::{Event, TurnOutcomeStatus};
use super::ops::Op;
use super::session::Session;
use super::tool_parser;
use super::turn::{TurnContext, TurnToolCall, post_turn_snapshot, pre_turn_snapshot};
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub model: String,
pub workspace: PathBuf,
pub allow_shell: bool,
pub trust_mode: bool,
pub notes_path: PathBuf,
pub mcp_config_path: PathBuf,
pub skills_dir: PathBuf,
pub instructions: Vec<PathBuf>,
pub project_context_pack_enabled: bool,
pub translation_enabled: bool,
pub max_steps: u32,
pub max_subagents: usize,
pub features: Features,
pub compaction: CompactionConfig,
pub cycle: CycleConfig,
pub capacity: CapacityControllerConfig,
pub todos: SharedTodoList,
pub plan_state: SharedPlanState,
pub max_spawn_depth: u32,
pub network_policy: Option<crate::network_policy::NetworkPolicyDecider>,
pub snapshots_enabled: bool,
pub snapshots_max_workspace_bytes: u64,
pub lsp_config: Option<crate::lsp::LspConfig>,
pub runtime_services: RuntimeToolServices,
pub subagent_model_overrides: HashMap<String, String>,
pub memory_enabled: bool,
pub memory_path: PathBuf,
pub vision_config: Option<crate::config::VisionModelConfig>,
pub goal_objective: Option<String>,
pub locale_tag: String,
pub strict_tool_mode: bool,
pub workshop: Option<crate::tools::large_output_router::WorkshopConfig>,
pub search_provider: crate::config::SearchProvider,
pub search_api_key: Option<String>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
model: DEFAULT_TEXT_MODEL.to_string(),
workspace: PathBuf::from("."),
allow_shell: true,
trust_mode: false,
notes_path: PathBuf::from("notes.txt"),
mcp_config_path: PathBuf::from("mcp.json"),
skills_dir: crate::skills::default_skills_dir(),
instructions: Vec::new(),
project_context_pack_enabled: true,
translation_enabled: false,
max_steps: 100,
max_subagents: DEFAULT_MAX_SUBAGENTS,
features: Features::with_defaults(),
compaction: CompactionConfig::default(),
cycle: CycleConfig::default(),
capacity: CapacityControllerConfig::default(),
todos: new_shared_todo_list(),
plan_state: new_shared_plan_state(),
max_spawn_depth: crate::tools::subagent::DEFAULT_MAX_SPAWN_DEPTH,
network_policy: None,
snapshots_enabled: true,
snapshots_max_workspace_bytes:
crate::snapshot::DEFAULT_MAX_WORKSPACE_BYTES_FOR_SNAPSHOT,
lsp_config: None,
runtime_services: RuntimeToolServices::default(),
subagent_model_overrides: HashMap::new(),
memory_enabled: false,
memory_path: PathBuf::from("./memory.md"),
vision_config: None,
strict_tool_mode: false,
goal_objective: None,
locale_tag: "en".to_string(),
workshop: None,
search_provider: crate::config::SearchProvider::default(),
search_api_key: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
pub enum CancelReason {
User,
External,
Preempted,
Internal,
}
impl CancelReason {
fn describe(self) -> &'static str {
match self {
Self::User => "user cancelled the request",
Self::External => "request cancelled by external caller",
Self::Preempted => "request was preempted by a new turn",
Self::Internal => "engine torn down before approval resolved",
}
}
}
#[derive(Clone)]
pub struct EngineHandle {
pub tx_op: mpsc::Sender<Op>,
pub rx_event: Arc<RwLock<mpsc::Receiver<Event>>>,
cancel_token: Arc<StdMutex<CancellationToken>>,
cancel_reason: Arc<StdMutex<Option<CancelReason>>>,
tx_approval: mpsc::Sender<ApprovalDecision>,
tx_user_input: mpsc::Sender<UserInputDecision>,
tx_steer: mpsc::Sender<String>,
}
impl EngineHandle {
pub async fn send(&self, op: Op) -> Result<()> {
self.tx_op.send(op).await?;
Ok(())
}
pub fn cancel(&self) {
self.cancel_with_reason(CancelReason::User);
}
pub fn cancel_with_reason(&self, reason: CancelReason) {
match self.cancel_reason.lock() {
Ok(mut slot) => *slot = Some(reason),
Err(poisoned) => *poisoned.into_inner() = Some(reason),
}
match self.cancel_token.lock() {
Ok(token) => token.cancel(),
Err(poisoned) => poisoned.into_inner().cancel(),
}
}
#[must_use]
#[allow(dead_code)]
pub fn is_cancelled(&self) -> bool {
match self.cancel_token.lock() {
Ok(token) => token.is_cancelled(),
Err(poisoned) => poisoned.into_inner().is_cancelled(),
}
}
pub async fn approve_tool_call(&self, id: impl Into<String>) -> Result<()> {
self.tx_approval
.send(ApprovalDecision::Approved { id: id.into() })
.await?;
Ok(())
}
pub async fn deny_tool_call(&self, id: impl Into<String>) -> Result<()> {
self.tx_approval
.send(ApprovalDecision::Denied { id: id.into() })
.await?;
Ok(())
}
pub async fn retry_tool_with_policy(
&self,
id: impl Into<String>,
policy: crate::sandbox::SandboxPolicy,
) -> Result<()> {
self.tx_approval
.send(ApprovalDecision::RetryWithPolicy {
id: id.into(),
policy,
})
.await?;
Ok(())
}
pub async fn submit_user_input(
&self,
id: impl Into<String>,
response: UserInputResponse,
) -> Result<()> {
self.tx_user_input
.send(UserInputDecision::Submitted {
id: id.into(),
response,
})
.await?;
Ok(())
}
pub async fn cancel_user_input(&self, id: impl Into<String>) -> Result<()> {
self.tx_user_input
.send(UserInputDecision::Cancelled { id: id.into() })
.await?;
Ok(())
}
pub async fn steer(&self, content: impl Into<String>) -> Result<()> {
self.tx_steer.send(content.into()).await?;
Ok(())
}
}
pub struct Engine {
config: EngineConfig,
deepseek_client: Option<DeepSeekClient>,
deepseek_client_error: Option<String>,
api_key_env_only_recovery: Option<String>,
session: Session,
subagent_manager: SharedSubAgentManager,
shell_manager: SharedShellManager,
mcp_pool: Option<Arc<AsyncMutex<McpPool>>>,
rx_op: mpsc::Receiver<Op>,
rx_approval: mpsc::Receiver<ApprovalDecision>,
rx_user_input: mpsc::Receiver<UserInputDecision>,
rx_steer: mpsc::Receiver<String>,
tx_event: mpsc::Sender<Event>,
tx_subagent_completion: mpsc::UnboundedSender<SubAgentCompletion>,
pub(super) rx_subagent_completion: mpsc::UnboundedReceiver<SubAgentCompletion>,
cancel_token: CancellationToken,
shared_cancel_token: Arc<StdMutex<CancellationToken>>,
pub(super) cancel_reason: Arc<StdMutex<Option<CancelReason>>>,
tool_exec_lock: Arc<RwLock<()>>,
capacity_controller: CapacityController,
seam_manager: Option<SeamManager>,
coherence_state: CoherenceState,
turn_counter: u64,
lsp_manager: Arc<crate::lsp::LspManager>,
workshop_vars: Option<
std::sync::Arc<tokio::sync::Mutex<crate::tools::large_output_router::WorkshopVariables>>,
>,
sandbox_backend: Option<std::sync::Arc<dyn crate::sandbox::backend::SandboxBackend>>,
pending_lsp_blocks: Vec<crate::lsp::DiagnosticBlock>,
}
impl Engine {
fn reset_cancel_token(&mut self) {
let token = CancellationToken::new();
self.cancel_token = token.clone();
match self.shared_cancel_token.lock() {
Ok(mut shared) => {
*shared = token;
}
Err(poisoned) => {
*poisoned.into_inner() = token;
}
}
match self.cancel_reason.lock() {
Ok(mut slot) => *slot = None,
Err(poisoned) => *poisoned.into_inner() = None,
}
}
fn env_only_api_key_recovery_hint(api_config: &Config) -> Option<String> {
if !crate::config::active_provider_uses_env_only_api_key(api_config) {
return None;
}
let provider = api_config.api_provider();
let env_var = match provider {
ApiProvider::Deepseek | ApiProvider::DeepseekCN => "DEEPSEEK_API_KEY",
ApiProvider::NvidiaNim => "NVIDIA_API_KEY/NVIDIA_NIM_API_KEY",
ApiProvider::Openai => "OPENAI_API_KEY",
ApiProvider::Atlascloud => "ATLASCLOUD_API_KEY",
ApiProvider::Openrouter => "OPENROUTER_API_KEY",
ApiProvider::Novita => "NOVITA_API_KEY",
ApiProvider::Fireworks => "FIREWORKS_API_KEY",
ApiProvider::Sglang => "SGLANG_API_KEY",
ApiProvider::Vllm => "VLLM_API_KEY",
ApiProvider::Ollama => "OLLAMA_API_KEY",
};
Some(format!(
"The rejected key came from {env_var}; no saved config key is present.\n\
Run `deepseek auth status` to inspect credential sources, then \
`deepseek auth set --provider {provider}` to save a valid key in ~/.deepseek/config.toml, \
or remove the stale export and open a fresh shell.",
provider = provider.as_str()
))
}
pub(super) fn decorate_auth_error_message(&self, message: String) -> String {
let Some(hint) = self.api_key_env_only_recovery.as_ref() else {
return message;
};
if crate::error_taxonomy::classify_error_message(&message) != ErrorCategory::Authentication
|| message.contains("no saved config key is present")
{
return message;
}
format!("{message}\n\n{hint}")
}
pub fn new(config: EngineConfig, api_config: &Config) -> (Self, EngineHandle) {
let (tx_op, rx_op) = mpsc::channel(32);
let (tx_event, rx_event) = mpsc::channel(256);
let (tx_approval, rx_approval) = mpsc::channel(64);
let (tx_user_input, rx_user_input) = mpsc::channel(32);
let (tx_steer, rx_steer) = mpsc::channel(64);
let (tx_subagent_completion, rx_subagent_completion) = mpsc::unbounded_channel();
let cancel_token = CancellationToken::new();
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
let cancel_reason: Arc<StdMutex<Option<CancelReason>>> = Arc::new(StdMutex::new(None));
let tool_exec_lock = Arc::new(RwLock::new(()));
let (deepseek_client, deepseek_client_error) = match DeepSeekClient::new(api_config) {
Ok(client) => (Some(client), None),
Err(err) => (None, Some(err.to_string())),
};
let api_key_env_only_recovery = Self::env_only_api_key_recovery_hint(api_config);
let mut session = Session::new(
config.model.clone(),
config.workspace.clone(),
config.allow_shell,
config.trust_mode,
config.notes_path.clone(),
config.mcp_config_path.clone(),
);
let user_memory_block =
crate::memory::compose_block(config.memory_enabled, &config.memory_path);
let system_prompt =
prompts::system_prompt_for_mode_with_context_skills_session_and_approval(
AppMode::Agent,
&config.workspace,
None,
Some(&config.skills_dir),
Some(&config.instructions),
prompts::PromptSessionContext {
user_memory_block: user_memory_block.as_deref(),
goal_objective: config.goal_objective.as_deref(),
project_context_pack_enabled: config.project_context_pack_enabled,
locale_tag: &config.locale_tag,
translation_enabled: config.translation_enabled,
},
session.approval_mode,
);
let stable_prompt = Some(system_prompt);
session.last_system_prompt_hash = Some(system_prompt_hash(stable_prompt.as_ref()));
session.system_prompt = stable_prompt;
let subagent_manager =
new_shared_subagent_manager(config.workspace.clone(), config.max_subagents);
let shell_manager = config
.runtime_services
.shell_manager
.clone()
.unwrap_or_else(|| new_shared_shell_manager(config.workspace.clone()));
let capacity_controller = CapacityController::new(config.capacity.clone());
let seam_manager = deepseek_client.as_ref().map(|main_client| {
let seam_config = SeamConfig {
enabled: api_config.context.enabled.unwrap_or(false),
verbatim_window_turns: api_config
.context
.verbatim_window_turns
.unwrap_or(crate::seam_manager::VERBATIM_WINDOW_TURNS),
l1_threshold: api_config
.context
.l1_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L1_THRESHOLD),
l2_threshold: api_config
.context
.l2_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L2_THRESHOLD),
l3_threshold: api_config
.context
.l3_threshold
.unwrap_or(crate::seam_manager::DEFAULT_L3_THRESHOLD),
cycle_threshold: api_config
.context
.cycle_threshold
.unwrap_or(crate::seam_manager::DEFAULT_CYCLE_THRESHOLD),
seam_model: api_config
.context
.seam_model
.clone()
.unwrap_or_else(|| crate::seam_manager::DEFAULT_SEAM_MODEL.to_string()),
};
SeamManager::new(main_client.clone(), seam_config)
});
let lsp_manager = Arc::new(match config.lsp_config.clone() {
Some(cfg) => crate::lsp::LspManager::new(cfg, config.workspace.clone()),
None => crate::lsp::LspManager::disabled(),
});
let workshop_vars: Option<
std::sync::Arc<
tokio::sync::Mutex<crate::tools::large_output_router::WorkshopVariables>,
>,
> = if config.workshop.is_some() {
Some(std::sync::Arc::new(tokio::sync::Mutex::new(
crate::tools::large_output_router::WorkshopVariables::default(),
)))
} else {
None
};
let sandbox_backend = crate::sandbox::backend::create_backend(api_config)
.unwrap_or_else(|e| {
tracing::warn!("Failed to create sandbox backend: {e}");
None
})
.map(std::sync::Arc::from);
let mut engine = Engine {
config,
deepseek_client,
deepseek_client_error,
api_key_env_only_recovery,
session,
subagent_manager,
shell_manager,
mcp_pool: None,
rx_op,
rx_approval,
rx_user_input,
rx_steer,
tx_event,
tx_subagent_completion,
rx_subagent_completion,
cancel_token: cancel_token.clone(),
shared_cancel_token: shared_cancel_token.clone(),
cancel_reason: cancel_reason.clone(),
tool_exec_lock,
capacity_controller,
seam_manager,
coherence_state: CoherenceState::default(),
turn_counter: 0,
lsp_manager,
pending_lsp_blocks: Vec::new(),
workshop_vars,
sandbox_backend,
};
engine.rehydrate_latest_canonical_state();
let handle = EngineHandle {
tx_op,
rx_event: Arc::new(RwLock::new(rx_event)),
cancel_token: shared_cancel_token,
cancel_reason,
tx_approval,
tx_user_input,
tx_steer,
};
(engine, handle)
}
#[allow(clippy::too_many_lines)]
pub async fn run(mut self) {
while let Some(op) = self.rx_op.recv().await {
match op {
Op::SendMessage {
content,
mode,
model,
goal_objective,
reasoning_effort,
reasoning_effort_auto,
auto_model,
allow_shell,
trust_mode,
auto_approve,
approval_mode,
translation_enabled,
} => {
self.handle_send_message(
content,
mode,
model,
goal_objective,
reasoning_effort,
reasoning_effort_auto,
auto_model,
allow_shell,
trust_mode,
auto_approve,
approval_mode,
translation_enabled,
)
.await;
}
Op::CancelRequest => {
self.cancel_token.cancel();
self.reset_cancel_token();
}
Op::ApproveToolCall { id } => {
let _ = self
.tx_event
.send(Event::status(format!("Approved tool call: {id}")))
.await;
}
Op::DenyToolCall { id } => {
let _ = self
.tx_event
.send(Event::status(format!("Denied tool call: {id}")))
.await;
}
Op::SpawnSubAgent { prompt } => {
let Some(client) = self.deepseek_client.clone() else {
let message = self
.deepseek_client_error
.as_deref()
.map(|err| format!("Failed to spawn sub-agent: {err}"))
.unwrap_or_else(|| {
"Failed to spawn sub-agent: API client not configured".to_string()
});
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal(message)))
.await;
continue;
};
let mut runtime = SubAgentRuntime::new(
client,
self.session.model.clone(),
self.build_tool_context(AppMode::Agent, self.session.auto_approve),
self.session.allow_shell,
Some(self.tx_event.clone()),
Arc::clone(&self.subagent_manager),
)
.with_role_models(self.config.subagent_model_overrides.clone())
.with_auto_model(self.session.auto_model)
.with_reasoning_effort(
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
)
.with_max_spawn_depth(self.config.max_spawn_depth)
.background_runtime();
let route = resolve_subagent_assignment_route(&runtime, None, &prompt).await;
runtime.model = route.model;
runtime.reasoning_effort = route.reasoning_effort;
runtime.reasoning_effort_auto = false;
let result = {
let mut manager = self.subagent_manager.write().await;
manager.spawn_background(
Arc::clone(&self.subagent_manager),
runtime,
SubAgentType::General,
prompt.clone(),
None,
)
};
match result {
Ok(snapshot) => {
let _ = self
.tx_event
.send(Event::status(format!(
"Spawned sub-agent {}",
snapshot.agent_id
)))
.await;
}
Err(err) => {
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal(format!(
"Failed to spawn sub-agent: {err}"
))))
.await;
}
}
}
Op::ListSubAgents => {
let agents = {
let mut manager = self.subagent_manager.write().await;
manager.cleanup(Duration::from_secs(60 * 60));
manager.list()
};
let _ = self.tx_event.send(Event::AgentList { agents }).await;
}
Op::ChangeMode { mode } => {
let _ = self
.tx_event
.send(Event::status(format!("Mode changed to: {mode:?}")))
.await;
}
Op::SetModel { model } => {
self.session.auto_model = model.trim().eq_ignore_ascii_case("auto");
self.session.model = model;
self.config.model.clone_from(&self.session.model);
let _ = self
.tx_event
.send(Event::status(format!(
"Model set to: {}",
self.session.model
)))
.await;
}
Op::SetCompaction { config } => {
let enabled = config.enabled;
self.config.compaction = config;
let _ = self
.tx_event
.send(Event::status(format!(
"Auto-compaction {}",
if enabled { "enabled" } else { "disabled" }
)))
.await;
}
Op::SyncSession {
session_id,
messages,
system_prompt,
model,
workspace,
} => {
if let Some(session_id) = session_id {
self.session.id = session_id;
} else if messages.is_empty() && system_prompt.is_none() {
self.session.id = uuid::Uuid::new_v4().to_string();
}
self.session.messages = messages;
self.session.compaction_summary_prompt =
extract_compaction_summary_prompt(system_prompt.clone());
self.session.system_prompt = system_prompt;
self.session.auto_model = model.trim().eq_ignore_ascii_case("auto");
self.session.model = model;
self.session.workspace = workspace.clone();
self.config.model.clone_from(&self.session.model);
self.config.workspace = workspace.clone();
let ctx = crate::project_context::load_project_context_with_parents(&workspace);
self.session.project_context = if ctx.has_instructions() {
Some(ctx)
} else {
None
};
self.session.rebuild_working_set();
self.rehydrate_latest_canonical_state();
self.emit_session_updated().await;
let _ = self
.tx_event
.send(Event::status("Session context synced".to_string()))
.await;
}
Op::CompactContext => {
self.handle_manual_compaction().await;
}
Op::Rlm {
content,
model,
child_model,
max_depth,
} => {
self.handle_rlm(content, model, child_model, max_depth)
.await;
}
Op::EditLastTurn { new_message } => {
let mut cut = None;
for (idx, msg) in self.session.messages.iter().enumerate().rev() {
if msg.role == "user" {
cut = Some(idx);
break;
}
}
if let Some(idx) = cut {
self.session.messages.truncate(idx);
}
let mode = AppMode::Agent; self.handle_send_message(
new_message,
mode,
self.session.model.clone(),
self.config.goal_objective.clone(),
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
self.session.auto_model,
self.session.allow_shell,
self.session.trust_mode,
self.session.auto_approve,
self.session.approval_mode,
self.config.translation_enabled,
)
.await;
}
Op::Shutdown => {
break;
}
}
}
if let Some(pool) = self.mcp_pool.as_ref() {
let mut guard = pool.lock().await;
guard.shutdown_all().await;
}
}
async fn emit_session_updated(&self) {
let _ = self
.tx_event
.send(Event::SessionUpdated {
session_id: self.session.id.clone(),
messages: self.session.messages.clone(),
system_prompt: self.session.system_prompt.clone(),
model: self.session.model.clone(),
workspace: self.session.workspace.clone(),
})
.await;
}
async fn add_session_message(&mut self, message: Message) {
self.session.add_message(message);
self.emit_session_updated().await;
}
fn turn_metadata_block(&self) -> ContentBlock {
let today = chrono::Local::now().format("%Y-%m-%d").to_string();
let working_set_summary = self
.session
.working_set
.summary_block(&self.config.workspace)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
let summary = if let Some(working_set_summary) = working_set_summary {
format!("Current local date: {today}\n{working_set_summary}")
} else {
format!("Current local date: {today}")
};
ContentBlock::Text {
text: format!("<turn_meta>\n{summary}\n</turn_meta>"),
cache_control: None,
}
}
fn user_text_message_with_turn_metadata(&self, text: String) -> Message {
Message {
role: "user".to_string(),
content: vec![
self.turn_metadata_block(),
ContentBlock::Text {
text,
cache_control: None,
},
],
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_send_message(
&mut self,
content: String,
mode: AppMode,
model: String,
goal_objective: Option<String>,
reasoning_effort: Option<String>,
reasoning_effort_auto: bool,
auto_model: bool,
allow_shell: bool,
trust_mode: bool,
auto_approve: bool,
approval_mode: crate::tui::approval::ApprovalMode,
translation_enabled: bool,
) {
self.reset_cancel_token();
while self.rx_steer.try_recv().is_ok() {}
let mut turn = TurnContext::new(self.config.max_steps);
self.turn_counter = self.turn_counter.saturating_add(1);
self.capacity_controller.mark_turn_start(self.turn_counter);
let _ = self
.tx_event
.send(Event::TurnStarted {
turn_id: turn.id.clone(),
})
.await;
if self.config.snapshots_enabled {
let pre_workspace = self.session.workspace.clone();
let pre_seq = self.turn_counter;
let pre_cap = self.config.snapshots_max_workspace_bytes;
let _ = tokio::task::spawn_blocking(move || {
pre_turn_snapshot(&pre_workspace, pre_seq, pre_cap)
})
.await;
}
crate::retry_status::clear();
if self.deepseek_client.is_none() {
let message = self
.deepseek_client_error
.as_deref()
.map(|err| format!("Failed to send message: {err}"))
.unwrap_or_else(|| "Failed to send message: API client not configured".to_string());
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(message.clone())))
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: turn.usage.clone(),
status: TurnOutcomeStatus::Failed,
error: Some(message),
})
.await;
return;
}
self.session
.working_set
.observe_user_message(&content, &self.session.workspace);
let force_update_plan_first = should_force_update_plan_first(mode, &content);
let user_msg = self.user_text_message_with_turn_metadata(content);
self.session.add_message(user_msg);
self.session.model = model;
self.config.model.clone_from(&self.session.model);
self.config.goal_objective = goal_objective;
self.session.reasoning_effort = reasoning_effort;
self.session.reasoning_effort_auto = reasoning_effort_auto;
self.session.auto_model = auto_model;
self.session.allow_shell = allow_shell;
self.config.allow_shell = allow_shell;
self.session.trust_mode = trust_mode;
self.config.trust_mode = trust_mode;
self.config.translation_enabled = translation_enabled;
self.session.auto_approve = auto_approve;
self.session.approval_mode = if auto_approve {
crate::tui::approval::ApprovalMode::Auto
} else {
approval_mode
};
self.refresh_system_prompt(mode);
self.emit_session_updated().await;
let todo_list = self.config.todos.clone();
let plan_state = self.config.plan_state.clone();
let tool_context = self.build_tool_context(mode, auto_approve);
let builder = self.build_turn_tool_registry_builder(mode, todo_list, plan_state);
let fork_context_for_runtime = if self.config.features.enabled(Feature::Subagents) {
let state = StructuredState::capture(
mode.label(),
self.config.workspace.clone(),
std::env::current_dir().ok(),
&self.session.working_set,
&self.config.todos,
&self.config.plan_state,
Some(&self.subagent_manager),
)
.await;
Some(SubAgentForkContext {
system: self.session.system_prompt.clone(),
messages: self.messages_with_turn_metadata(),
structured_state_block: state.to_system_block(),
})
} else {
None
};
let mailbox_for_runtime = if self.config.features.enabled(Feature::Subagents) {
let cancel_token = self.cancel_token.child_token();
let (mailbox, mut receiver) = Mailbox::new(cancel_token.clone());
let tx_event_clone = self.tx_event.clone();
spawn_supervised(
"subagent-mailbox-drainer",
std::panic::Location::caller(),
async move {
while let Some(envelope) = receiver.recv().await {
if tx_event_clone
.send(Event::SubAgentMailbox {
seq: envelope.seq,
message: envelope.message,
})
.await
.is_err()
{
break;
}
}
},
);
Some((mailbox, cancel_token))
} else {
None
};
let tool_registry = match mode {
AppMode::Agent | AppMode::Yolo => {
if self.config.features.enabled(Feature::Subagents) {
let runtime = if let Some(client) = self.deepseek_client.clone() {
let mut rt = SubAgentRuntime::new(
client,
self.session.model.clone(),
tool_context.clone(),
self.session.allow_shell,
Some(self.tx_event.clone()),
Arc::clone(&self.subagent_manager),
)
.with_role_models(self.config.subagent_model_overrides.clone())
.with_auto_model(self.session.auto_model)
.with_reasoning_effort(
self.session.reasoning_effort.clone(),
self.session.reasoning_effort_auto,
)
.with_max_spawn_depth(self.config.max_spawn_depth)
.with_parent_completion_tx(self.tx_subagent_completion.clone());
if let Some(context) = fork_context_for_runtime.clone() {
rt = rt.with_fork_context(context);
}
if let Some((mailbox, cancel_token)) = mailbox_for_runtime.as_ref() {
rt = rt
.with_mailbox(mailbox.clone())
.with_cancel_token(cancel_token.clone());
}
Some(rt)
} else {
None
};
Some(
builder
.with_subagent_tools(
self.subagent_manager.clone(),
runtime.expect("sub-agent runtime should exist with active client"),
)
.build(tool_context),
)
} else {
Some(builder.build(tool_context))
}
}
_ => Some(builder.build(tool_context)),
};
let mcp_tools = if self.config.features.enabled(Feature::Mcp) {
self.mcp_tools().await
} else {
Vec::new()
};
let tools = tool_registry.as_ref().map(|registry| {
build_model_tool_catalog(registry.to_api_tools_with_cache(true), mcp_tools, mode)
});
let (status, error) = self
.handle_deepseek_turn(
&mut turn,
tool_registry.as_ref(),
tools,
mode,
force_update_plan_first,
)
.await;
if matches!(status, TurnOutcomeStatus::Completed) {
self.maybe_advance_cycle(mode).await;
}
self.session.total_usage.add(&turn.usage);
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: turn.usage,
status,
error,
})
.await;
if self.config.snapshots_enabled {
let post_workspace = self.session.workspace.clone();
let post_seq = self.turn_counter;
let post_cap = self.config.snapshots_max_workspace_bytes;
crate::utils::spawn_blocking_supervised("post-turn-snapshot", move || {
post_turn_snapshot(&post_workspace, post_seq, post_cap);
});
}
}
async fn handle_manual_compaction(&mut self) {
let id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]);
let zero_usage = Usage {
input_tokens: 0,
output_tokens: 0,
..Usage::default()
};
let Some(client) = self.deepseek_client.clone() else {
let message = "Manual compaction unavailable: API client not configured".to_string();
self.emit_compaction_failed(id, false, message.clone())
.await;
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(message.clone())))
.await;
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status: TurnOutcomeStatus::Failed,
error: Some(message),
})
.await;
return;
};
let start_message = "Manual context compaction started".to_string();
self.emit_compaction_started(id.clone(), false, start_message)
.await;
let compaction_pins = self
.session
.working_set
.pinned_message_indices(&self.session.messages, &self.session.workspace);
let compaction_paths = self.session.working_set.top_paths(24);
let messages_before = self.session.messages.len();
let mut turn_status = TurnOutcomeStatus::Completed;
let mut turn_error = None;
match compact_messages_safe(
&client,
&self.session.messages,
&self.config.compaction,
Some(&self.session.workspace),
Some(&compaction_pins),
Some(&compaction_paths),
)
.await
{
Ok(result) => {
if !result.messages.is_empty() || self.session.messages.is_empty() {
let messages_after = result.messages.len();
self.session.messages = result.messages;
self.merge_compaction_summary(result.summary_prompt);
self.emit_session_updated().await;
let removed = messages_before.saturating_sub(messages_after);
let message = if result.retries_used > 0 {
format!(
"Compaction complete: {messages_before} → {messages_after} messages ({removed} removed, {} retries)",
result.retries_used
)
} else {
format!(
"Compaction complete: {messages_before} → {messages_after} messages ({removed} removed)"
)
};
self.emit_compaction_completed(
id,
false,
message,
Some(messages_before),
Some(messages_after),
)
.await;
} else {
let message = "Compaction skipped: produced empty result".to_string();
self.emit_compaction_failed(id, false, message.clone())
.await;
turn_status = TurnOutcomeStatus::Failed;
turn_error = Some(message);
}
}
Err(err) => {
let message = format!("Manual context compaction failed: {err}");
self.emit_compaction_failed(id, false, message.clone())
.await;
let _ = self.tx_event.send(Event::status(message.clone())).await;
turn_status = TurnOutcomeStatus::Failed;
turn_error = Some(message);
}
}
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: zero_usage,
status: turn_status,
error: turn_error,
})
.await;
}
async fn handle_rlm(
&mut self,
content: String,
model: String,
child_model: String,
max_depth: u32,
) {
use crate::rlm::turn::run_rlm_turn;
let Some(ref client) = self.deepseek_client else {
let err = self
.deepseek_client_error
.as_deref()
.map(|s| s.to_string())
.unwrap_or_else(|| "API client not configured".to_string());
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::fatal_auth(format!(
"RLM error: {err}"
))))
.await;
return;
};
let _ = self
.tx_event
.send(Event::status("RLM turn started".to_string()))
.await;
let result = run_rlm_turn(
client,
model,
content,
child_model,
self.tx_event.clone(),
max_depth,
)
.await;
let has_error = result.error.is_some();
if let Some(ref err) = result.error {
let _ = self
.tx_event
.send(Event::error(ErrorEnvelope::tool(format!(
"RLM error: {err}"
))))
.await;
}
if !result.answer.is_empty() {
self.add_session_message(crate::models::Message {
role: "assistant".to_string(),
content: vec![crate::models::ContentBlock::Text {
text: result.answer.clone(),
cache_control: None,
}],
})
.await;
let _ = self
.tx_event
.send(Event::MessageDelta {
index: 0,
content: result.answer.clone(),
})
.await;
let _ = self
.tx_event
.send(Event::MessageComplete { index: 0 })
.await;
}
let _ = self
.tx_event
.send(Event::TurnComplete {
usage: result.usage,
status: if has_error {
crate::core::events::TurnOutcomeStatus::Failed
} else {
crate::core::events::TurnOutcomeStatus::Completed
},
error: result.error,
})
.await;
}
fn estimated_input_tokens(&self) -> usize {
estimate_input_tokens_conservative(
&self.session.messages,
self.session.system_prompt.as_ref(),
)
}
fn trim_oldest_messages_to_budget(&mut self, target_input_budget: usize) -> usize {
let mut removed = 0usize;
while self.session.messages.len() > MIN_RECENT_MESSAGES_TO_KEEP
&& self.estimated_input_tokens() > target_input_budget
{
self.session.messages.remove(0);
removed = removed.saturating_add(1);
}
removed
}
async fn recover_context_overflow(
&mut self,
client: &DeepSeekClient,
reason: &str,
requested_output_tokens: u32,
) -> bool {
let Some(target_budget) =
context_input_budget(&self.session.model, requested_output_tokens)
else {
return false;
};
let id = format!("compact_{}", &uuid::Uuid::new_v4().to_string()[..8]);
let start_message = format!("Emergency context compaction started ({reason})");
self.emit_compaction_started(id.clone(), true, start_message)
.await;
let before_tokens = self.estimated_input_tokens();
let before_count = self.session.messages.len();
let mut retries_used = 0u32;
let mut summary_prompt = None;
let mut compacted_messages = self.session.messages.clone();
let mut forced_config = self.config.compaction.clone();
forced_config.enabled = true;
forced_config.token_threshold = forced_config
.token_threshold
.min(target_budget.saturating_sub(1))
.max(1);
forced_config.auto_floor_tokens = 0;
match compact_messages_safe(
client,
&self.session.messages,
&forced_config,
Some(&self.session.workspace),
None,
None,
)
.await
{
Ok(result) => {
retries_used = result.retries_used;
compacted_messages = result.messages;
summary_prompt = result.summary_prompt;
}
Err(err) => {
let _ = self
.tx_event
.send(Event::status(format!(
"Emergency compaction API pass failed: {err}. Falling back to local trim."
)))
.await;
}
}
if !compacted_messages.is_empty() || self.session.messages.is_empty() {
self.session.messages = compacted_messages;
}
self.merge_compaction_summary(summary_prompt);
let trimmed = self.trim_oldest_messages_to_budget(target_budget);
self.emit_session_updated().await;
let after_tokens = self.estimated_input_tokens();
let after_count = self.session.messages.len();
let recovered = after_tokens <= target_budget
&& (after_tokens < before_tokens || after_count < before_count || trimmed > 0);
if recovered {
let removed = before_count.saturating_sub(after_count);
let mut details = format!(
"Emergency compaction complete: {before_count} → {after_count} messages ({removed} removed), ~{before_tokens} → ~{after_tokens} tokens"
);
if retries_used > 0 {
details.push_str(&format!(" ({} retries)", retries_used));
}
if trimmed > 0 {
details.push_str(&format!(", trimmed {trimmed} oldest"));
}
self.emit_compaction_completed(
id,
true,
details.clone(),
Some(before_count),
Some(after_count),
)
.await;
let _ = self.tx_event.send(Event::status(details)).await;
return true;
}
let message = format!(
"Emergency context compaction failed to reduce request below model limit \
(estimate ~{} tokens, budget ~{}).",
after_tokens, target_budget
);
self.emit_compaction_failed(id, true, message.clone()).await;
let _ = self.tx_event.send(Event::status(message)).await;
false
}
fn build_tool_context(&self, mode: AppMode, auto_approve: bool) -> ToolContext {
let trusted = crate::workspace_trust::WorkspaceTrust::load_for(&self.session.workspace);
let mut ctx = ToolContext::with_auto_approve(
self.session.workspace.clone(),
self.session.trust_mode,
self.session.notes_path.clone(),
self.session.mcp_config_path.clone(),
mode == AppMode::Yolo || auto_approve,
)
.with_state_namespace(self.session.id.clone())
.with_features(self.config.features.clone())
.with_shell_manager(self.shell_manager.clone())
.with_runtime_services(self.config.runtime_services.clone())
.with_cancel_token(self.cancel_token.clone())
.with_trusted_external_paths(trusted.paths().to_vec());
if self.config.memory_enabled {
ctx.memory_path = Some(self.config.memory_path.clone());
}
if let Some(decider) = self.config.network_policy.as_ref() {
ctx = ctx.with_network_policy(decider.clone());
}
if let Some(workshop_cfg) = self.config.workshop.as_ref()
&& let Some(vars_arc) = self.workshop_vars.as_ref()
{
let router =
crate::tools::large_output_router::LargeOutputRouter::new(workshop_cfg.clone());
ctx = ctx.with_large_output_router(router, vars_arc.clone());
}
if let Some(backend) = self.sandbox_backend.as_ref() {
ctx = ctx.with_sandbox_backend(std::sync::Arc::clone(backend));
}
ctx.search_provider = self.config.search_provider;
ctx.search_api_key = self.config.search_api_key.clone();
let policy = sandbox_policy_for_mode(mode, &self.session.workspace);
let mut ctx = ctx.with_elevated_sandbox_policy(policy);
if matches!(mode, AppMode::Plan) {
ctx = ctx.with_shell_network_denied_hint(
"Shell command blocked: Plan mode runs shell commands in a read-only sandbox — no writes, no network. Use Agent mode (`/mode agent`) for any command that creates or modifies files, or that needs network access.",
);
}
ctx
}
async fn ensure_mcp_pool(&mut self) -> Result<Arc<AsyncMutex<McpPool>>, ToolError> {
if let Some(pool) = self.mcp_pool.as_ref() {
return Ok(Arc::clone(pool));
}
let mut pool = McpPool::from_config_path(&self.session.mcp_config_path)
.map_err(|e| ToolError::execution_failed(format!("Failed to load MCP config: {e}")))?;
if let Some(decider) = self.config.network_policy.as_ref() {
pool = pool.with_network_policy(decider.clone());
}
let pool = Arc::new(AsyncMutex::new(pool));
self.mcp_pool = Some(Arc::clone(&pool));
Ok(pool)
}
async fn mcp_tools(&mut self) -> Vec<Tool> {
let pool = match self.ensure_mcp_pool().await {
Ok(pool) => pool,
Err(err) => {
let _ = self.tx_event.send(Event::status(err.to_string())).await;
return Vec::new();
}
};
let mut pool = pool.lock().await;
let errors = pool.connect_all().await;
for (server, err) in errors {
let _ = self
.tx_event
.send(Event::status(format!(
"Failed to connect MCP server '{server}': {err:#}"
)))
.await;
}
pool.to_api_tools()
}
#[allow(clippy::too_many_lines)]
async fn layered_context_checkpoint(&mut self) {
let Some(ref seam_mgr) = self.seam_manager else {
return;
};
if !seam_mgr.config().enabled {
return;
}
let highest = seam_mgr.highest_level().await;
let Some(level) = seam_mgr.seam_level_for(self.estimated_input_tokens(), highest) else {
return;
};
let msg_count = self.session.messages.len();
let verbatim_start = seam_mgr.verbatim_window_start(msg_count);
if verbatim_start == 0 {
return; }
let msg_range_end = verbatim_start;
let pinned = self
.session
.working_set
.pinned_message_indices(&self.session.messages, &self.session.workspace);
let _ = self
.tx_event
.send(Event::status(format!(
"⏻ producing L{level} context seam ({msg_range_end} messages)…"
)))
.await;
let existing_seams = seam_mgr.collect_seam_texts(&self.session.messages).await;
let seam_text = if existing_seams.is_empty() {
match seam_mgr
.produce_soft_seam(
&self.session.messages,
level,
0,
msg_range_end,
Some(&self.session.workspace),
&pinned,
)
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!("L{level} soft seam failed: {err}"));
return;
}
}
} else {
let recent: Vec<&Message> = (0..msg_range_end)
.filter_map(|i| self.session.messages.get(i))
.collect();
match seam_mgr
.recompact(&existing_seams, &recent, level, 0, msg_range_end)
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!("L{level} recompact failed: {err}"));
return;
}
}
};
if seam_text.is_empty() {
return;
}
let seam_count = seam_mgr.seam_count().await;
self.add_session_message(Message {
role: "assistant".to_string(),
content: vec![ContentBlock::Text {
text: seam_text,
cache_control: None,
}],
})
.await;
let _ = self
.tx_event
.send(Event::status(format!(
"⏻ L{level} seam complete ({seam_count} total, {msg_range_end} messages covered)"
)))
.await;
}
async fn maybe_advance_cycle(&mut self, mode: AppMode) {
if !should_advance_cycle(
self.estimated_input_tokens() as u64,
turn_response_headroom_tokens(),
&self.session.model,
&self.config.cycle,
false,
) {
return;
}
let Some(client) = self.deepseek_client.clone() else {
crate::logging::warn(
"Cycle boundary skipped: API client not configured for briefing turn",
);
return;
};
let from = self.session.cycle_count;
let to = from.saturating_add(1);
let archive_started = self.session.current_cycle_started;
let max_briefing_tokens = self.config.cycle.briefing_max_for(&self.session.model);
let _ = self
.tx_event
.send(Event::status(format!(
"↻ context refreshing (cycle {from} → {to}, generating briefing…)"
)))
.await;
let briefing_text = if let Some(ref seam_mgr) = self.seam_manager {
let seams = seam_mgr.collect_seam_texts(&self.session.messages).await;
let state_text = {
let s = StructuredState::capture(
mode.label(),
self.config.workspace.clone(),
std::env::current_dir().ok(),
&self.session.working_set,
&self.config.todos,
&self.config.plan_state,
Some(&self.subagent_manager),
)
.await;
s.to_system_block()
};
match seam_mgr
.produce_flash_briefing(&seams, state_text.as_deref())
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!(
"Flash briefing failed, falling back to main model: {err}"
));
match produce_briefing(
&client,
&self.session.model,
&self.session.messages,
max_briefing_tokens,
)
.await
{
Ok(text) => text,
Err(err2) => {
crate::logging::warn(format!(
"Cycle briefing turn failed; skipping cycle advance: {err2}"
));
let _ = self
.tx_event
.send(Event::status(format!(
"↻ cycle handoff failed (continuing in cycle {from}): {err2}"
)))
.await;
return;
}
}
}
}
} else {
match produce_briefing(
&client,
&self.session.model,
&self.session.messages,
max_briefing_tokens,
)
.await
{
Ok(text) => text,
Err(err) => {
crate::logging::warn(format!(
"Cycle briefing turn failed; skipping cycle advance: {err}"
));
let _ = self
.tx_event
.send(Event::status(format!(
"↻ cycle handoff failed (continuing in cycle {from}): {err}"
)))
.await;
return;
}
}
};
let briefing_tokens = estimate_briefing_tokens(&briefing_text);
let now = chrono::Utc::now();
let briefing = CycleBriefing {
cycle: to,
timestamp: now,
briefing_text: briefing_text.clone(),
token_estimate: briefing_tokens,
};
match archive_cycle(
&self.session.id,
to,
&self.session.messages,
&self.session.model,
archive_started,
) {
Ok(path) => {
crate::logging::info(format!("Cycle {to} archived to {}", path.display()));
}
Err(err) => {
crate::logging::warn(format!(
"Failed to archive cycle {to}; continuing with swap: {err}"
));
}
}
let state = StructuredState::capture(
mode.label(),
self.config.workspace.clone(),
std::env::current_dir().ok(),
&self.session.working_set,
&self.config.todos,
&self.config.plan_state,
Some(&self.subagent_manager),
)
.await;
let state_block = state.to_system_block();
let seed_messages = build_seed_messages(
state_block.as_deref(),
Some(&briefing),
None, );
self.session.messages = seed_messages;
self.session.cycle_count = to;
self.session.current_cycle_started = now;
self.session.cycle_briefings.push(briefing.clone());
if let Some(ref seam_mgr) = self.seam_manager {
seam_mgr.reset().await;
}
self.session.compaction_summary_prompt = None;
self.refresh_system_prompt(mode);
self.emit_session_updated().await;
let _ = self
.tx_event
.send(Event::CycleAdvanced {
from,
to,
briefing: briefing.clone(),
})
.await;
let _ = self
.tx_event
.send(Event::status(format!(
"↻ context refreshed (cycle {from} → {to}, briefing: {briefing_tokens} tokens carried)"
)))
.await;
}
fn refresh_system_prompt(&mut self, mode: AppMode) {
let user_memory_block =
crate::memory::compose_block(self.config.memory_enabled, &self.config.memory_path);
let base = prompts::system_prompt_for_mode_with_context_skills_session_and_approval(
mode,
&self.config.workspace,
None,
Some(&self.config.skills_dir),
Some(&self.config.instructions),
prompts::PromptSessionContext {
user_memory_block: user_memory_block.as_deref(),
goal_objective: self.config.goal_objective.as_deref(),
project_context_pack_enabled: self.config.project_context_pack_enabled,
locale_tag: &self.config.locale_tag,
translation_enabled: self.config.translation_enabled,
},
self.session.approval_mode,
);
let stable_prompt =
merge_system_prompts(Some(&base), self.session.compaction_summary_prompt.clone());
let stable_hash = system_prompt_hash(stable_prompt.as_ref());
if self.session.last_system_prompt_hash != Some(stable_hash) {
self.session.system_prompt = stable_prompt;
self.session.last_system_prompt_hash = Some(stable_hash);
}
}
fn merge_compaction_summary(&mut self, summary_prompt: Option<SystemPrompt>) {
if summary_prompt.is_none() {
return;
}
self.session.compaction_summary_prompt = merge_system_prompts(
self.session.compaction_summary_prompt.as_ref(),
summary_prompt.clone(),
);
let merged = merge_system_prompts(self.session.system_prompt.as_ref(), summary_prompt);
self.session.last_system_prompt_hash = Some(system_prompt_hash(merged.as_ref()));
self.session.system_prompt = merged;
}
}
fn system_prompt_hash(prompt: Option<&SystemPrompt>) -> u64 {
let mut hasher = DefaultHasher::new();
match prompt {
Some(SystemPrompt::Text(text)) => {
0u8.hash(&mut hasher);
text.hash(&mut hasher);
}
Some(SystemPrompt::Blocks(blocks)) => {
1u8.hash(&mut hasher);
for block in blocks {
block.block_type.hash(&mut hasher);
block.text.hash(&mut hasher);
if let Some(cache_control) = &block.cache_control {
cache_control.cache_type.hash(&mut hasher);
}
}
}
None => {
2u8.hash(&mut hasher);
}
}
hasher.finish()
}
pub fn spawn_engine(config: EngineConfig, api_config: &Config) -> EngineHandle {
let (engine, handle) = Engine::new(config, api_config);
spawn_supervised(
"engine-event-loop",
std::panic::Location::caller(),
async move {
engine.run().await;
},
);
handle
}
#[cfg(test)]
pub(crate) struct MockEngineHandle {
pub handle: EngineHandle,
pub rx_op: mpsc::Receiver<Op>,
rx_approval: mpsc::Receiver<ApprovalDecision>,
pub rx_steer: mpsc::Receiver<String>,
pub tx_event: mpsc::Sender<Event>,
pub cancel_token: CancellationToken,
}
#[cfg(test)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum MockApprovalEvent {
Approved {
id: String,
},
Denied {
id: String,
},
RetryWithPolicy {
id: String,
policy: crate::sandbox::SandboxPolicy,
},
}
#[cfg(test)]
impl MockEngineHandle {
pub(crate) async fn recv_approval_event(&mut self) -> Option<MockApprovalEvent> {
match self.rx_approval.recv().await? {
ApprovalDecision::Approved { id } => Some(MockApprovalEvent::Approved { id }),
ApprovalDecision::Denied { id } => Some(MockApprovalEvent::Denied { id }),
ApprovalDecision::RetryWithPolicy { id, policy } => {
Some(MockApprovalEvent::RetryWithPolicy { id, policy })
}
}
}
}
#[cfg(test)]
pub(crate) fn mock_engine_handle() -> MockEngineHandle {
let (tx_op, rx_op) = mpsc::channel(32);
let (tx_event, rx_event) = mpsc::channel(256);
let (tx_approval, rx_approval) = mpsc::channel(64);
let (tx_user_input, _rx_user_input) = mpsc::channel(32);
let (tx_steer, rx_steer) = mpsc::channel(64);
let cancel_token = CancellationToken::new();
let shared_cancel_token = Arc::new(StdMutex::new(cancel_token.clone()));
let cancel_reason: Arc<StdMutex<Option<CancelReason>>> = Arc::new(StdMutex::new(None));
let handle = EngineHandle {
tx_op,
rx_event: Arc::new(RwLock::new(rx_event)),
cancel_token: shared_cancel_token,
cancel_reason,
tx_approval,
tx_user_input,
tx_steer,
};
MockEngineHandle {
handle,
rx_op,
rx_approval,
rx_steer,
tx_event,
cancel_token,
}
}
mod approval;
mod capacity_flow;
mod context;
pub(crate) use context::compact_tool_result_for_context;
use context::{
COMPACTION_SUMMARY_MARKER, MAX_CONTEXT_RECOVERY_ATTEMPTS, MIN_RECENT_MESSAGES_TO_KEEP,
TURN_MAX_OUTPUT_TOKENS, context_input_budget, effective_max_output_tokens,
estimate_input_tokens_conservative, extract_compaction_summary_prompt,
is_context_length_error_message, summarize_text, turn_response_headroom_tokens,
};
mod dispatch;
mod loop_guard;
mod lsp_hooks;
mod streaming;
mod tool_catalog;
mod tool_execution;
mod tool_setup;
mod turn_loop;
use self::approval::{ApprovalDecision, ApprovalResult, UserInputDecision};
use self::dispatch::{
ParallelToolResult, ParallelToolResultEntry, ToolExecGuard, ToolExecOutcome, ToolExecutionPlan,
caller_allowed_for_tool, caller_type_for_tool_use, final_tool_input, format_tool_error,
mcp_tool_approval_description, mcp_tool_is_parallel_safe, mcp_tool_is_read_only,
parse_parallel_tool_calls, parse_tool_input, should_force_update_plan_first,
should_parallelize_tool_batch, should_stop_after_plan_tool,
};
use self::loop_guard::{AttemptDecision, LoopGuard, OutcomeDecision};
#[cfg(test)]
use self::lsp_hooks::{edited_paths_for_tool, parse_patch_paths};
#[cfg(test)]
use self::streaming::TOOL_CALL_START_MARKERS;
use self::streaming::{
ContentBlockKind, FAKE_WRAPPER_NOTICE, MAX_STREAM_ERRORS_BEFORE_FAIL,
MAX_TRANSPARENT_STREAM_RETRIES, STREAM_MAX_CONTENT_BYTES, STREAM_MAX_DURATION_SECS,
ToolUseState, contains_fake_tool_wrapper, filter_tool_call_delta,
should_transparently_retry_stream, stream_chunk_timeout_secs,
};
use self::tool_catalog::{
CODE_EXECUTION_TOOL_NAME, JS_EXECUTION_TOOL_NAME, MULTI_TOOL_PARALLEL_NAME,
REQUEST_USER_INPUT_NAME, active_tools_for_step, build_model_tool_catalog,
ensure_advanced_tooling, execute_code_execution_tool, execute_tool_search,
initial_active_tools, is_tool_search_tool, maybe_hydrate_requested_deferred_tool,
missing_tool_error_message,
};
#[cfg(test)]
use self::tool_catalog::{
TOOL_SEARCH_BM25_NAME, maybe_activate_requested_deferred_tool, should_default_defer_tool,
};
use self::tool_execution::emit_tool_audit;
use self::tool_setup::sandbox_policy_for_mode;
use crate::tools::js_execution::execute_js_execution_tool;
#[cfg(test)]
mod tests;