#[allow(clippy::wildcard_imports)]
use super::*;
impl LeanCtxServer {
pub(crate) async fn call_tool_guarded(
&self,
request: CallToolRequestParams,
) -> Result<CallToolResult, ErrorData> {
self.check_idle_expiry().await;
self.resolve_roots_once().await;
elicitation::increment_call();
let original_name = request.name.as_ref().to_string();
let (resolved_name, resolved_args) = if original_name == "ctx" {
let sub = request
.arguments
.as_ref()
.and_then(|a| a.get("tool"))
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string)
.ok_or_else(|| {
ErrorData::invalid_params("'tool' is required for ctx meta-tool", None)
})?;
let tool_name = if sub.starts_with("ctx_") {
sub
} else {
format!("ctx_{sub}")
};
let mut args = request.arguments.unwrap_or_default();
args.remove("tool");
(tool_name, Some(args))
} else {
(original_name, request.arguments)
};
let name = resolved_name.as_str();
let args = resolved_args.as_ref();
let role_check = role_guard::check_tool_access(name);
if let Some(denied) = role_guard::into_call_tool_result(&role_check) {
tracing::warn!(
tool = name,
role = %role_check.role_name,
"Tool blocked by role policy"
);
return Ok(denied);
}
if name != "ctx_workflow" {
let active = self.workflow.read().await.clone();
if let Some(run) = active {
if run.current == "done" || is_workflow_stale(&run) {
let mut wf = self.workflow.write().await;
*wf = None;
let _ = crate::core::workflow::clear_active();
} else if !WORKFLOW_PASSTHROUGH_TOOLS.contains(&name) {
if let Some(state) = run.spec.state(&run.current) {
if let Some(allowed) = &state.allowed_tools {
let allowed_ok = allowed.iter().any(|t| t == name);
if !allowed_ok {
let mut shown = allowed.clone();
shown.sort();
shown.truncate(30);
return Ok(CallToolResult::success(vec![Content::text(format!(
"Tool '{name}' blocked by workflow '{}' (state: {}). Allowed: {}. Use ctx_workflow(action=\"stop\") to exit.",
run.spec.name,
run.current,
shown.join(", ")
))]));
}
}
}
}
}
}
let auto_context = {
let task = {
let session = self.session.read().await;
session.task.as_ref().map(|t| t.description.clone())
};
let project_root = {
let session = self.session.read().await;
session.project_root.clone()
};
let cache_timeout =
tokio::time::timeout(std::time::Duration::from_secs(5), self.cache.write()).await;
if let Ok(mut cache) = cache_timeout {
crate::tools::autonomy::session_lifecycle_pre_hook(
&self.autonomy,
name,
&mut cache,
task.as_deref(),
project_root.as_deref(),
CrpMode::effective(),
)
} else {
tracing::warn!("pre-dispatch: cache write-lock timeout (5s), skipping autonomy");
None
}
};
let args_fp = args
.map(|a| {
crate::core::loop_detection::LoopDetector::fingerprint(&serde_json::Value::Object(
a.clone(),
))
})
.unwrap_or_default();
let throttle_result = {
let fp = &args_fp;
let detector_timeout = tokio::time::timeout(
std::time::Duration::from_secs(3),
self.loop_detector.write(),
)
.await;
if let Ok(mut detector) = detector_timeout {
let is_search = crate::core::loop_detection::LoopDetector::is_search_tool(name);
let is_search_shell = name == "ctx_shell" && {
let cmd = args
.as_ref()
.and_then(|a| a.get("command"))
.and_then(|v| v.as_str())
.unwrap_or("");
crate::core::loop_detection::LoopDetector::is_search_shell_command(cmd)
};
if is_search || is_search_shell {
let search_pattern = args.and_then(|a| {
a.get("pattern")
.or_else(|| a.get("query"))
.and_then(|v| v.as_str())
});
let shell_pattern = if is_search_shell {
args.and_then(|a| a.get("command"))
.and_then(|v| v.as_str())
.and_then(helpers::extract_search_pattern_from_command)
} else {
None
};
let pat = search_pattern.or(shell_pattern.as_deref());
detector.record_search(name, fp, pat)
} else {
detector.record_call(name, fp)
}
} else {
tracing::warn!("pre-dispatch: loop_detector write-lock timeout (3s), skipping");
crate::core::loop_detection::ThrottleResult::default()
}
};
if throttle_result.level == crate::core::loop_detection::ThrottleLevel::Blocked {
let msg = throttle_result.message.unwrap_or_default();
return Ok(CallToolResult::success(vec![Content::text(msg)]));
}
let throttle_warning =
if throttle_result.level == crate::core::loop_detection::ThrottleLevel::Reduced {
throttle_result.message.clone()
} else {
None
};
let config = crate::core::config::Config::load();
let minimal = config.minimal_overhead_effective();
{
use crate::core::budget_tracker::{BudgetLevel, BudgetTracker};
let snap = BudgetTracker::global().check();
if *snap.worst_level() == BudgetLevel::Exhausted
&& name != "ctx_session"
&& name != "ctx_cost"
&& name != "ctx_metrics"
{
for (dim, lvl, used, limit) in [
(
"tokens",
&snap.tokens.level,
format!("{}", snap.tokens.used),
format!("{}", snap.tokens.limit),
),
(
"shell",
&snap.shell.level,
format!("{}", snap.shell.used),
format!("{}", snap.shell.limit),
),
(
"cost",
&snap.cost.level,
format!("${:.2}", snap.cost.used_usd),
format!("${:.2}", snap.cost.limit_usd),
),
] {
if *lvl == BudgetLevel::Exhausted {
crate::core::events::emit_budget_exhausted(&snap.role, dim, &used, &limit);
}
}
let msg = format!(
"[BUDGET EXHAUSTED] {}\n\
Use `ctx_session action=role` to check/switch roles, \
or `ctx_session action=reset` to start fresh.",
snap.format_compact()
);
tracing::warn!(tool = name, "{msg}");
return Ok(CallToolResult::success(vec![Content::text(msg)]));
}
}
if is_shell_tool_name(name) {
crate::core::budget_tracker::BudgetTracker::global().record_shell();
}
let tool_start = std::time::Instant::now();
let (mut result_text, tool_saved_tokens) =
match self.dispatch_tool(name, args, minimal).await {
Ok(pair) => pair,
Err(e) => {
if let Ok(mut detector) = tokio::time::timeout(
std::time::Duration::from_secs(1),
self.loop_detector.write(),
)
.await
{
detector.record_error_outcome(name, &args_fp);
}
return Err(e);
}
};
let is_raw_shell = name == "ctx_shell" && {
let arg_raw = helpers::get_bool(args, "raw").unwrap_or(false);
let arg_bypass = helpers::get_bool(args, "bypass").unwrap_or(false);
arg_raw
|| arg_bypass
|| std::env::var("LEAN_CTX_DISABLED").is_ok()
|| std::env::var("LEAN_CTX_RAW").is_ok()
};
let pre_terse_len = result_text.len();
let output_tokens = {
let tokens = crate::core::tokens::count_tokens(&result_text) as u64;
crate::core::budget_tracker::BudgetTracker::global().record_tokens(tokens);
tokens
};
crate::core::anomaly::record_metric("tokens_per_call", output_tokens as f64);
if let Some(ref ir) = self.context_ir {
let tool_duration = tool_start.elapsed();
let source_kind = match name {
n if n.contains("read") || n.contains("multi_read") || n.contains("smart_read") => {
crate::core::context_ir::ContextIrSourceKindV1::Read
}
"ctx_shell" => crate::core::context_ir::ContextIrSourceKindV1::Shell,
"ctx_search" | "ctx_semantic_search" => {
crate::core::context_ir::ContextIrSourceKindV1::Search
}
"ctx_provider" => crate::core::context_ir::ContextIrSourceKindV1::Provider,
_ => crate::core::context_ir::ContextIrSourceKindV1::Other,
};
let ir_path = helpers::get_str(args, "path");
let ir_command = helpers::get_str(args, "command");
let ir_mode = helpers::get_str(args, "mode");
let excerpt = if result_text.len() > 200 {
let mut end = 200;
while !result_text.is_char_boundary(end) && end > 0 {
end -= 1;
}
&result_text[..end]
} else {
&result_text
};
let input = crate::core::context_ir::RecordIrInput {
kind: source_kind,
tool: name,
client_name: None,
agent_id: None,
path: ir_path.as_deref(),
command: ir_command.as_deref(),
pattern: ir_mode.as_deref(),
input_tokens: pre_terse_len / 4,
output_tokens: output_tokens as usize,
duration: tool_duration,
content_excerpt: excerpt,
};
ir.write().await.record(input);
}
{
let mut detector = self.loop_detector.write().await;
if name == "ctx_read" {
let path = helpers::get_str(args, "path").unwrap_or_default();
let mode = helpers::get_str(args, "mode").unwrap_or_else(|| "auto".into());
let fresh = helpers::get_bool(args, "fresh").unwrap_or(false);
detector.record_read_for_correction(&path, &mode, fresh);
} else if name == "ctx_shell" {
let cmd = helpers::get_str(args, "command").unwrap_or_default();
detector.record_shell_for_correction(&cmd);
}
let correction_count = detector.correction_count();
if correction_count > 0 {
crate::core::anomaly::record_metric(
"correction_loop_rate",
f64::from(correction_count),
);
}
use crate::core::config::CompressionLevel;
if correction_count >= 5 {
CompressionLevel::set_session_degrade(&CompressionLevel::Off);
} else if correction_count >= 3 {
CompressionLevel::set_session_degrade(&CompressionLevel::Lite);
} else if correction_count == 0 {
CompressionLevel::clear_session_degrade();
}
detector.prune_corrections();
}
crate::core::anomaly::save_debounced();
let budget_warning = {
use crate::core::budget_tracker::{BudgetLevel, BudgetTracker};
let snap = BudgetTracker::global().check();
if *snap.worst_level() == BudgetLevel::Warning {
for (dim, lvl, used, limit, pct) in [
(
"tokens",
&snap.tokens.level,
format!("{}", snap.tokens.used),
format!("{}", snap.tokens.limit),
snap.tokens.percent,
),
(
"shell",
&snap.shell.level,
format!("{}", snap.shell.used),
format!("{}", snap.shell.limit),
snap.shell.percent,
),
(
"cost",
&snap.cost.level,
format!("${:.2}", snap.cost.used_usd),
format!("${:.2}", snap.cost.limit_usd),
snap.cost.percent,
),
] {
if *lvl == BudgetLevel::Warning {
crate::core::events::emit_budget_warning(
&snap.role, dim, &used, &limit, pct,
);
}
}
if crate::core::protocol::meta_visible() {
Some(format!("[BUDGET WARNING] {}", snap.format_compact()))
} else {
None
}
} else {
None
}
};
let archive_hint = if minimal || is_raw_shell {
None
} else {
use crate::core::archive;
let archivable = matches!(
name,
"ctx_shell"
| "ctx_read"
| "ctx_multi_read"
| "ctx_smart_read"
| "ctx_execute"
| "ctx_search"
| "ctx_tree"
);
if archivable && archive::should_archive(&result_text) {
let cmd = helpers::get_str(args, "command")
.or_else(|| helpers::get_str(args, "path"))
.unwrap_or_default();
let session_id = self.session.read().await.id.clone();
let to_store = crate::core::redaction::redact_text_if_enabled(&result_text);
let tokens = crate::core::tokens::count_tokens(&to_store);
archive::store(name, &cmd, &to_store, Some(&session_id))
.map(|id| archive::format_hint(&id, to_store.len(), tokens))
} else {
None
}
};
let pre_compression = result_text.clone();
let deeply_compressed = matches!(
name,
"ctx_read" | "ctx_multi_read" | "ctx_smart_read" | "ctx_compress" | "ctx_overview"
);
let skip_terse = is_raw_shell
|| (tool_saved_tokens > 0 && deeply_compressed)
|| (name == "ctx_shell"
&& helpers::get_str(args, "command")
.is_some_and(|c| crate::shell::compress::has_structural_output(&c)));
let compression = crate::core::config::CompressionLevel::effective(&config);
if compression.is_active() && !skip_terse {
let terse_result =
crate::core::terse::pipeline::compress(&result_text, &compression, None);
if terse_result.quality_passed && terse_result.savings_pct >= 3.0 {
result_text = terse_result.output;
}
}
let profile_hints = crate::core::profiles::active_profile().output_hints;
if !is_raw_shell && profile_hints.verify_footer() {
let verify_cfg = crate::core::profiles::active_profile().verification;
let vr = crate::core::output_verification::verify_output(
&pre_compression,
&result_text,
&verify_cfg,
);
if !vr.warnings.is_empty() {
let msg = format!("[VERIFY] {}", vr.format_compact());
result_text = format!("{result_text}\n\n{msg}");
}
}
if profile_hints.archive_hint() {
if let Some(hint) = archive_hint {
result_text = format!("{result_text}\n{hint}");
}
}
if !is_raw_shell {
if let Some(ctx) = auto_context {
let ctx_tokens = crate::core::tokens::count_tokens(&ctx);
if ctx_tokens <= 400 {
result_text = format!("{ctx}\n\n{result_text}");
}
}
}
if let Some(warning) = throttle_warning {
result_text = format!("{result_text}\n\n{warning}");
}
if let Some(bw) = budget_warning {
result_text = format!("{result_text}\n\n{bw}");
}
if !self
.rules_stale_checked
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
let client = self.client_name.read().await.clone();
if !client.is_empty() && crate::rules_inject::check_rules_freshness(&client).is_some() {
let _ = tokio::task::spawn_blocking(|| {
if let Some(home) = dirs::home_dir() {
let _ = crate::rules_inject::inject_all_rules(&home);
}
})
.await;
result_text = format!(
"{result_text}\n\n[RULES AUTO-UPDATED] Your lean-ctx rules were written by \
an older version and have been refreshed on disk. Start a new session to \
load them for full compatibility."
);
} else if !self
.rules_tip_shown
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
let cfg = crate::core::config::Config::load();
if !cfg.setup.should_inject_rules() {
result_text = format!(
"{result_text}\n\n\
--- tip: run 'lean-ctx setup --inject-rules' for optimal AI integration ---"
);
}
}
}
{
let _ = crate::core::slo::evaluate();
}
if name == "ctx_read" {
if minimal {
let cache_clone = self.cache.clone();
let autonomy_clone = self.autonomy.clone();
let name_owned = name.to_string();
tokio::spawn(async move {
let result = std::panic::AssertUnwindSafe(async {
let mut cache = cache_clone.write().await;
crate::tools::autonomy::maybe_auto_dedup(
&autonomy_clone,
&mut cache,
&name_owned,
);
})
.catch_unwind()
.await;
if let Err(e) = result {
let msg = e
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| e.downcast_ref::<&str>().copied())
.unwrap_or("unknown");
tracing::error!("background auto_dedup panicked: {msg}");
}
});
} else {
let read_path = self
.resolve_path_or_passthrough(
&helpers::get_str(args, "path").unwrap_or_default(),
)
.await;
let project_root = {
let session = self.session.read().await;
session.project_root.clone()
};
let enrich_timeout =
tokio::time::timeout(std::time::Duration::from_secs(3), self.cache.write())
.await;
if let Ok(mut cache) = enrich_timeout {
let enrich = crate::tools::autonomy::enrich_after_read(
&self.autonomy,
&mut cache,
&read_path,
project_root.as_deref(),
None,
crate::tools::CrpMode::effective(),
false,
);
if profile_hints.related_hint() {
if let Some(hint) = enrich.related_hint {
result_text = format!("{result_text}\n{hint}");
}
}
crate::tools::autonomy::maybe_auto_dedup(&self.autonomy, &mut cache, name);
} else {
tracing::warn!(
"post-dispatch cache lock timeout (3s) for {read_path}, skipping enrichment"
);
}
let ledger_clone = self.ledger.clone();
let session_clone = self.session.clone();
let peer_clone = self.peer.clone();
let read_path_owned = read_path.clone();
let project_root_owned = project_root.clone();
let mode_used =
helpers::get_str(args, "mode").unwrap_or_else(|| "auto".to_string());
let out_tok = output_tokens as usize;
let sent_tok = crate::core::tokens::count_tokens(&result_text);
let wants_eviction = true;
let wants_elicitation = profile_hints.elicitation_hint();
tokio::spawn(async move {
let result = std::panic::AssertUnwindSafe(async {
let active_task = {
let session = session_clone.read().await;
session.task.as_ref().map(|t| t.description.clone())
};
let mut ledger = ledger_clone.write().await;
let overlay = crate::core::context_overlay::OverlayStore::load_project(
&std::path::PathBuf::from(project_root_owned.as_deref().unwrap_or(".")),
);
let gate_result = context_gate::post_dispatch_record_with_task(
&read_path_owned,
&mode_used,
out_tok,
sent_tok,
&mut ledger,
&overlay,
active_task.as_deref(),
);
drop(ledger);
if wants_eviction {
if let Some(hint) = &gate_result.eviction_hint {
tracing::debug!("deferred eviction hint: {hint}");
}
}
if wants_elicitation {
if let Some(hint) = &gate_result.elicitation_hint {
tracing::debug!("deferred elicitation hint: {hint}");
}
}
if gate_result.resource_changed {
if let Some(peer) = peer_clone.read().await.as_ref() {
notifications::send_resource_updated(
peer,
notifications::RESOURCE_URI_SUMMARY,
)
.await;
}
}
})
.catch_unwind()
.await;
if let Err(e) = result {
let msg = e
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| e.downcast_ref::<&str>().copied())
.unwrap_or("unknown");
tracing::error!("background post_dispatch panicked: {msg}");
}
});
}
}
if !minimal && !is_raw_shell && name == "ctx_shell" {
let cmd = helpers::get_str(args, "command").unwrap_or_default();
if let Some(file_path) = extract_file_read_from_shell(&cmd) {
if let Ok(mut bt) = crate::core::bounce_tracker::global().lock() {
bt.next_seq();
bt.record_shell_file_access(&file_path);
}
}
if profile_hints.efficiency_hint() {
let calls = self.tool_calls.read().await;
let last_original = calls.last().map_or(0, |c| c.original_tokens);
drop(calls);
let pre_hint_tokens = crate::core::tokens::count_tokens(&result_text);
if let Some(hint) = crate::tools::autonomy::shell_efficiency_hint(
&self.autonomy,
&cmd,
last_original,
pre_hint_tokens,
) {
result_text = format!("{result_text}\n{hint}");
}
}
}
if !minimal && !is_raw_shell {
if let Ok(data_dir) = crate::core::data_dir::lean_ctx_data_dir() {
let session = self.session.read().await;
bypass_hint::set_session_id(&session.id);
drop(session);
if let Some(hint) = bypass_hint::check(&data_dir) {
result_text = format!("{result_text}\n{hint}");
}
}
bypass_hint::record_lctx_call();
}
if let Some(finding) = crate::core::auto_findings::extract(name, &result_text) {
let mut session = self.session.write().await;
session.add_finding(finding.file.as_deref(), None, &finding.summary);
let project_root = session.project_root.clone();
drop(session);
if let Some(ref root) = project_root {
let f = finding.clone();
let r = root.clone();
std::thread::spawn(move || {
crate::core::auto_capture::capture_finding(&r, &f);
});
}
}
if let Some(extra) = crate::core::auto_capture::extract_extra(name, &result_text) {
let session = self.session.read().await;
let project_root = session.project_root.clone();
drop(session);
if let Some(ref root) = project_root {
let e = extra.clone();
let r = root.clone();
std::thread::spawn(move || {
crate::core::auto_capture::capture_finding(&r, &e);
});
}
}
{
let tool_name = name.to_string();
let summary = result_text.lines().next().unwrap_or("").to_string();
std::thread::spawn(move || {
crate::core::journal::maybe_day_separator();
crate::core::journal::log_tool_call(&tool_name, &summary);
});
}
#[allow(clippy::cast_possible_truncation)]
let output_token_count = if result_text.len() == pre_terse_len {
output_tokens as usize
} else {
crate::core::tokens::count_tokens(&result_text)
};
if result_text.len() != pre_terse_len && tool_saved_tokens > 0 {
let pre_savings = tool_saved_tokens;
let actual_sent = output_token_count;
let original = actual_sent + pre_savings;
let actual_savings = original.saturating_sub(actual_sent);
if actual_savings != pre_savings {
let delta = pre_savings as i64 - actual_savings as i64;
if delta != 0 {
crate::core::stats::adjust_savings(name, delta);
}
}
}
let action = helpers::get_str(args, "action");
const K_STALENESS_BOUND: i64 = 10;
if self.session_mode == crate::tools::SessionMode::Shared {
if let Some(ref rt) = self.context_os {
let latest = rt.bus.latest_id(&self.workspace_id, &self.channel_id);
let cursor = self
.last_seen_event_id
.load(std::sync::atomic::Ordering::Relaxed);
if cursor > 0 && latest - cursor > K_STALENESS_BOUND {
let gap = latest - cursor;
result_text = format!(
"[CONTEXT STALE] {gap} events happened since your last read. \
Use ctx_session(action=\"status\") to sync.\n\n{result_text}"
);
}
self.last_seen_event_id
.store(latest, std::sync::atomic::Ordering::Relaxed);
}
}
{
let input = helpers::canonical_args_string(args);
let input_md5 = helpers::hash_fast(&input);
let output_md5 = helpers::hash_fast(&result_text);
let agent_id = self.agent_id.read().await.clone();
let client_name = self.client_name.read().await.clone();
let mut explicit_intent: Option<(
crate::core::intent_protocol::IntentRecord,
Option<String>,
String,
)> = None;
let pending_session_save = {
let empty_args = serde_json::Map::new();
let args_map = args.unwrap_or(&empty_args);
let mut session = self.session.write().await;
session.record_tool_receipt(
name,
action.as_deref(),
&input_md5,
&output_md5,
agent_id.as_deref(),
Some(&client_name),
);
if let Some(intent) = crate::core::intent_protocol::infer_from_tool_call(
name,
action.as_deref(),
args_map,
session.project_root.as_deref(),
) {
let is_explicit =
intent.source == crate::core::intent_protocol::IntentSource::Explicit;
let root = session.project_root.clone();
let sid = session.id.clone();
session.record_intent(intent.clone());
if is_explicit {
explicit_intent = Some((intent, root, sid));
}
}
if session.should_save() {
session.prepare_save().ok()
} else {
None
}
};
if let Some(prepared) = pending_session_save {
let ir_clone = self.context_ir.clone();
tokio::task::spawn_blocking(move || {
let _ = prepared.write_to_disk();
if let Some(ir) = ir_clone {
if let Ok(ir_guard) = ir.try_read() {
ir_guard.save();
}
}
});
}
if let Some((intent, root, session_id)) = explicit_intent {
let _ = crate::core::intent_protocol::apply_side_effects(
&intent,
root.as_deref(),
&session_id,
);
}
if self.autonomy.is_enabled() {
let (calls, project_root) = {
let session = self.session.read().await;
(session.stats.total_tool_calls, session.project_root.clone())
};
if let Some(root) = project_root {
if crate::tools::autonomy::should_auto_consolidate(&self.autonomy, calls) {
let root_clone = root.clone();
tokio::task::spawn_blocking(move || {
let _ = crate::core::consolidation_engine::consolidate_latest(
&root_clone,
crate::core::consolidation_engine::ConsolidationBudgets::default(),
);
});
}
}
}
let agent_key = agent_id.unwrap_or_else(|| "unknown".to_string());
let input_token_count = crate::core::tokens::count_tokens(&input) as u64;
let output_token_count_u64 = output_token_count as u64;
let name_owned = name.to_string();
tokio::task::spawn_blocking(move || {
let pricing = crate::core::gain::model_pricing::ModelPricing::load();
let quote = pricing.quote_from_env_or_agent_type(&client_name);
let cost_usd =
quote
.cost
.estimate_usd(input_token_count, output_token_count_u64, 0, 0);
crate::core::budget_tracker::BudgetTracker::global().record_cost_usd(cost_usd);
let mut store = crate::core::a2a::cost_attribution::CostStore::load();
store.record_tool_call(
&agent_key,
&client_name,
&name_owned,
input_token_count,
output_token_count_u64,
0,
);
if let Err(e) = store.save() {
tracing::warn!("lean-ctx: failed to persist cost attribution: {e}");
}
});
}
if self.session_mode == crate::tools::SessionMode::Shared
&& name == "ctx_knowledge"
&& action.as_deref() == Some("remember")
{
if let Some(ref rt) = self.context_os {
let my_agent = self.agent_id.read().await.clone();
let category = helpers::get_str(args, "category");
let key = helpers::get_str(args, "key");
if let (Some(ref cat), Some(ref k)) = (&category, &key) {
let recent = rt.bus.recent_by_kind(
&self.workspace_id,
&self.channel_id,
"knowledge_remembered",
20,
);
for ev in &recent {
let p = &ev.payload;
let ev_cat = p.get("category").and_then(|v| v.as_str());
let ev_key = p.get("key").and_then(|v| v.as_str());
let ev_actor = ev.actor.as_deref();
if ev_cat == Some(cat.as_str())
&& ev_key == Some(k.as_str())
&& ev_actor != my_agent.as_deref()
{
let other = ev_actor.unwrap_or("unknown");
result_text = format!(
"[CONFLICT] Agent '{other}' recently wrote to the same knowledge key \
'{cat}/{k}'. Review before proceeding.\n\n{result_text}"
);
break;
}
}
}
}
}
if self.session_mode == crate::tools::SessionMode::Shared {
let ws = self.workspace_id.clone();
let ch = self.channel_id.clone();
let rt = self.context_os.clone();
let agent = self.agent_id.read().await.clone();
let tool = name.to_string();
let tool_action = action.clone();
let tool_path = helpers::get_str(args, "path");
let tool_category = helpers::get_str(args, "category");
let tool_key = helpers::get_str(args, "key");
let session_snapshot = self.session.read().await.clone();
let session_task = session_snapshot.task.clone();
tokio::task::spawn_blocking(move || {
let Some(rt) = rt else {
return;
};
let Some(root) = session_snapshot.project_root.as_deref() else {
return;
};
rt.shared_sessions
.persist_best_effort(root, &ws, &ch, &session_snapshot);
rt.metrics.record_session_persisted();
let mut base_payload = serde_json::json!({
"tool": tool,
"action": tool_action,
});
if let Some(ref p) = tool_path {
base_payload["path"] = serde_json::Value::String(p.clone());
}
if let Some(ref c) = tool_category {
base_payload["category"] = serde_json::Value::String(c.clone());
}
if let Some(ref k) = tool_key {
base_payload["key"] = serde_json::Value::String(k.clone());
}
if let Some(ref t) = session_task {
base_payload["reasoning"] = serde_json::Value::String(t.description.clone());
}
if rt
.bus
.append(
&ws,
&ch,
&crate::core::context_os::ContextEventKindV1::ToolCallRecorded,
agent.as_deref(),
base_payload.clone(),
)
.is_some()
{
rt.metrics.record_event_appended();
rt.metrics.record_event_broadcast();
}
if let Some(secondary) =
crate::core::context_os::secondary_event_kind(&tool, tool_action.as_deref())
{
if rt
.bus
.append(&ws, &ch, &secondary, agent.as_deref(), base_payload)
.is_some()
{
rt.metrics.record_event_appended();
rt.metrics.record_event_broadcast();
}
}
});
}
let skip_checkpoint = minimal
|| matches!(
name,
"ctx_compress"
| "ctx_metrics"
| "ctx_benchmark"
| "ctx_analyze"
| "ctx_cache"
| "ctx_discover"
| "ctx_dedup"
| "ctx_session"
| "ctx_knowledge"
| "ctx_agent"
| "ctx_share"
| "ctx_gain"
| "ctx_overview"
| "ctx_preload"
| "ctx_cost"
| "ctx_heatmap"
| "ctx_task"
| "ctx_impact"
| "ctx_architecture"
| "ctx_smells"
| "ctx_workflow"
);
if !skip_checkpoint && self.increment_and_check() {
if let Some(checkpoint) = self.auto_checkpoint().await {
let interval = LeanCtxServer::checkpoint_interval_effective();
let hints = crate::core::profiles::active_profile().output_hints;
if hints.checkpoint_in_output() && crate::core::protocol::meta_visible() {
let combined = format!(
"{result_text}\n\n--- AUTO CHECKPOINT (every {interval} calls) ---\n{checkpoint}"
);
return Ok(CallToolResult::success(vec![Content::text(combined)]));
}
}
}
let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
if tool_duration_ms > 100 {
LeanCtxServer::append_tool_call_log(
name,
tool_duration_ms,
0,
0,
None,
&chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string(),
);
}
let current_count = self.call_count.load(std::sync::atomic::Ordering::Relaxed);
if current_count > 0 && current_count.is_multiple_of(100) {
std::thread::spawn(crate::cloud_sync::cloud_background_tasks);
}
Ok(CallToolResult::success(vec![Content::text(result_text)]))
}
async fn resolve_roots_once(&self) {
use std::sync::atomic::Ordering;
if !self.has_client_roots.load(Ordering::Relaxed) {
return;
}
if self.roots_resolved.swap(true, Ordering::Relaxed) {
return;
}
let peer_guard = self.peer.read().await;
let Some(peer) = peer_guard.as_ref() else {
return;
};
let list_result = match peer.list_roots().await {
Ok(r) => r,
Err(e) => {
tracing::warn!("roots/list failed: {e}");
return;
}
};
drop(peer_guard);
let uris: Vec<String> = list_result.roots.iter().map(|r| r.uri.clone()).collect();
let validated_paths = roots::valid_dir_paths_from_uris(&uris);
let Some(new_root) = roots::best_root_from_uris(&uris) else {
return;
};
if crate::core::pathutil::is_broad_or_unsafe_root(std::path::Path::new(&new_root)) {
tracing::warn!("MCP roots: ignoring unsafe project root {new_root}");
return;
}
let mut session = self.session.write().await;
let old_root = session.project_root.clone();
let other_roots: Vec<String> = validated_paths
.iter()
.filter(|p| p.as_str() != new_root)
.cloned()
.collect();
if !other_roots.is_empty() {
session.extra_roots = other_roots;
tracing::info!(
"MCP roots: {} extra root(s) registered",
session.extra_roots.len()
);
}
if old_root.as_deref() == Some(&new_root) {
let _ = session.save();
return;
}
tracing::info!(
"MCP roots: switching project root from {:?} to {new_root}",
old_root
);
if let Some(existing) =
crate::core::session::SessionState::load_latest_for_project_root(&new_root)
{
*session = existing;
session.extra_roots = validated_paths
.iter()
.filter(|p| p.as_str() != new_root)
.cloned()
.collect();
}
session.project_root = Some(new_root.clone());
let extra = session.extra_roots.clone();
let _ = session.save();
drop(session);
crate::core::index_orchestrator::ensure_all_background(&new_root);
if !extra.is_empty() {
let r = new_root;
std::thread::spawn(move || {
crate::core::index_orchestrator::ensure_extra_roots_background(&r, &extra);
});
}
}
}