use std::sync::atomic::Ordering;
use super::server::{CepComputedStats, CrpMode, LeanCtxServer, ToolCallRecord};
use super::startup::auto_consolidate_knowledge;
use super::{ctx_compress, ctx_share};
impl LeanCtxServer {
pub async fn record_call(
&self,
tool: &str,
original: usize,
saved: usize,
mode: Option<String>,
) {
self.record_call_with_timing(tool, original, saved, mode, 0)
.await;
}
pub async fn record_call_with_path(
&self,
tool: &str,
original: usize,
saved: usize,
mode: Option<String>,
path: Option<&str>,
) {
self.record_call_with_timing_inner(tool, original, saved, mode, 0, path)
.await;
}
pub async fn record_call_with_timing(
&self,
tool: &str,
original: usize,
saved: usize,
mode: Option<String>,
duration_ms: u64,
) {
self.record_call_with_timing_inner(tool, original, saved, mode, duration_ms, None)
.await;
}
async fn record_call_with_timing_inner(
&self,
tool: &str,
original: usize,
saved: usize,
mode: Option<String>,
duration_ms: u64,
path: Option<&str>,
) {
let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
let mut calls = self.tool_calls.write().await;
calls.push(ToolCallRecord {
tool: tool.to_string(),
original_tokens: original,
saved_tokens: saved,
mode: mode.clone(),
duration_ms,
timestamp: ts.clone(),
});
const MAX_TOOL_CALL_RECORDS: usize = 500;
if calls.len() > MAX_TOOL_CALL_RECORDS {
let excess = calls.len() - MAX_TOOL_CALL_RECORDS;
calls.drain(..excess);
}
if duration_ms > 0 {
Self::append_tool_call_log(tool, duration_ms, original, saved, mode.as_deref(), &ts);
}
crate::core::events::emit_tool_call(
tool,
original as u64,
saved as u64,
mode.clone(),
duration_ms,
path.map(ToString::to_string),
);
let output_tokens = original.saturating_sub(saved);
crate::core::stats::record(tool, original, output_tokens);
let mut session = self.session.write().await;
session.record_tool_call(saved as u64, original as u64);
if tool == "ctx_shell" {
session.record_command();
}
let pending_save = if session.should_save() {
session.prepare_save().ok()
} else {
None
};
drop(calls);
drop(session);
if let Some(prepared) = pending_save {
tokio::task::spawn_blocking(move || {
let _ = prepared.write_to_disk();
});
}
self.write_mcp_live_stats().await;
}
pub async fn is_prompt_cache_stale(&self) -> bool {
let last = *self.last_call.read().await;
last.elapsed().as_secs() > 3600
}
pub fn upgrade_mode_if_stale(mode: &str, stale: bool) -> &str {
if !stale {
return mode;
}
match mode {
"full" => "full",
"map" => "signatures",
m => m,
}
}
pub fn increment_and_check(&self) -> bool {
let count = self.call_count.fetch_add(1, Ordering::Relaxed) + 1;
let interval = Self::checkpoint_interval_effective();
interval > 0 && count.is_multiple_of(interval)
}
pub async fn auto_checkpoint(&self) -> Option<String> {
let cache = self.cache.read().await;
if cache.get_all_entries().is_empty() {
return None;
}
let complexity = crate::core::adaptive::classify_from_context(&cache);
let checkpoint = ctx_compress::handle(&cache, false, CrpMode::effective());
drop(cache);
let mut session = self.session.write().await;
let _ = session.save();
let session_summary = session.format_compact();
let has_insights = !session.findings.is_empty() || !session.decisions.is_empty();
let project_root = session.project_root.clone();
drop(session);
if has_insights {
if let Some(ref root) = project_root {
let root = root.clone();
std::thread::spawn(move || {
auto_consolidate_knowledge(&root);
});
}
}
let multi_agent_block = self
.auto_multi_agent_checkpoint(project_root.as_ref())
.await;
self.record_call("ctx_compress", 0, 0, Some("auto".to_string()))
.await;
self.record_cep_snapshot().await;
Some(format!(
"{checkpoint}\n\n--- SESSION STATE ---\n{session_summary}\n\n{}{multi_agent_block}",
complexity.instruction_suffix()
))
}
async fn auto_multi_agent_checkpoint(&self, project_root: Option<&String>) -> String {
let Some(root) = project_root else {
return String::new();
};
let registry = crate::core::agents::AgentRegistry::load_or_create();
let active = registry.list_active(Some(root));
if active.len() <= 1 {
return String::new();
}
let agent_id = self.agent_id.read().await;
let my_id = match agent_id.as_deref() {
Some(id) => id.to_string(),
None => return String::new(),
};
drop(agent_id);
let cache = self.cache.read().await;
let entries = cache.get_all_entries();
if !entries.is_empty() {
let mut by_access: Vec<_> = entries.iter().collect();
by_access.sort_by_key(|x| std::cmp::Reverse(x.1.read_count));
let top_paths: Vec<&str> = by_access
.iter()
.take(5)
.map(|(key, _)| key.as_str())
.collect();
let paths_csv = top_paths.join(",");
let _ = ctx_share::handle(
"push",
Some(&my_id),
None,
Some(&paths_csv),
None,
&cache,
root,
);
}
drop(cache);
let pending_count = registry
.scratchpad
.iter()
.filter(|e| !e.read_by.contains(&my_id) && e.from_agent != my_id)
.count();
let shared_dir = crate::core::data_dir::lean_ctx_data_dir()
.unwrap_or_default()
.join("agents")
.join("shared");
let shared_count = if shared_dir.exists() {
std::fs::read_dir(&shared_dir).map_or(0, std::iter::Iterator::count)
} else {
0
};
let agent_names: Vec<String> = active
.iter()
.map(|a| {
let role = a.role.as_deref().unwrap_or(&a.agent_type);
format!("{role}({})", &a.agent_id[..8.min(a.agent_id.len())])
})
.collect();
format!(
"\n\n--- MULTI-AGENT SYNC ---\nAgents: {} | Pending msgs: {} | Shared contexts: {}\nAuto-shared top-5 cached files.\n--- END SYNC ---",
agent_names.join(", "),
pending_count,
shared_count,
)
}
pub fn append_tool_call_log(
tool: &str,
duration_ms: u64,
original: usize,
saved: usize,
mode: Option<&str>,
timestamp: &str,
) {
const MAX_LOG_LINES: usize = 50;
if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
let log_path = dir.join("tool-calls.log");
let mode_str = mode.unwrap_or("-");
let slow = if duration_ms > 5000 { " **SLOW**" } else { "" };
let line = format!(
"{timestamp}\t{tool}\t{duration_ms}ms\torig={original}\tsaved={saved}\tmode={mode_str}{slow}\n"
);
let mut lines: Vec<String> = std::fs::read_to_string(&log_path)
.unwrap_or_default()
.lines()
.map(std::string::ToString::to_string)
.collect();
lines.push(line.trim_end().to_string());
if lines.len() > MAX_LOG_LINES {
lines.drain(0..lines.len() - MAX_LOG_LINES);
}
let _ = std::fs::write(&log_path, lines.join("\n") + "\n");
}
}
fn compute_cep_stats(
calls: &[ToolCallRecord],
stats: &crate::core::cache::CacheStats,
complexity: &crate::core::adaptive::TaskComplexity,
) -> CepComputedStats {
let total_original: u64 = calls.iter().map(|c| c.original_tokens as u64).sum();
let total_saved: u64 = calls.iter().map(|c| c.saved_tokens as u64).sum();
let total_compressed = total_original.saturating_sub(total_saved);
let compression_rate = if total_original > 0 {
total_saved as f64 / total_original as f64
} else {
0.0
};
let modes_used: std::collections::HashSet<&str> =
calls.iter().filter_map(|c| c.mode.as_deref()).collect();
let mode_diversity = (modes_used.len() as f64 / 10.0).min(1.0);
let cache_util = stats.hit_rate() / 100.0;
let cep_score = cache_util * 0.3 + mode_diversity * 0.2 + compression_rate * 0.5;
let mut mode_counts: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
for call in calls {
if let Some(ref mode) = call.mode {
*mode_counts.entry(mode.clone()).or_insert(0) += 1;
}
}
CepComputedStats {
cep_score: (cep_score * 100.0).round() as u32,
cache_util: (cache_util * 100.0).round() as u32,
mode_diversity: (mode_diversity * 100.0).round() as u32,
compression_rate: (compression_rate * 100.0).round() as u32,
total_original,
total_compressed,
total_saved,
mode_counts,
complexity: format!("{complexity:?}"),
cache_hits: stats.cache_hits,
total_reads: stats.total_reads,
tool_call_count: calls.len() as u64,
}
}
async fn write_mcp_live_stats(&self) {
let count = self.call_count.load(Ordering::Relaxed);
if count > 1 && !count.is_multiple_of(5) {
return;
}
let cache = self.cache.read().await;
let calls = self.tool_calls.read().await;
let stats = cache.get_stats();
let complexity = crate::core::adaptive::classify_from_context(&cache);
let cs = Self::compute_cep_stats(&calls, stats, &complexity);
let started_at = calls
.first()
.map(|c| c.timestamp.clone())
.unwrap_or_default();
drop(cache);
drop(calls);
let live = serde_json::json!({
"cep_score": cs.cep_score,
"cache_utilization": cs.cache_util,
"mode_diversity": cs.mode_diversity,
"compression_rate": cs.compression_rate,
"task_complexity": cs.complexity,
"files_cached": cs.total_reads,
"total_reads": cs.total_reads,
"cache_hits": cs.cache_hits,
"tokens_saved": cs.total_saved,
"tokens_original": cs.total_original,
"tool_calls": cs.tool_call_count,
"started_at": started_at,
"updated_at": chrono::Local::now().to_rfc3339(),
});
if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
let _ = std::fs::write(dir.join("mcp-live.json"), live.to_string());
}
}
pub async fn record_cep_snapshot(&self) {
let cache = self.cache.read().await;
let calls = self.tool_calls.read().await;
let stats = cache.get_stats();
let complexity = crate::core::adaptive::classify_from_context(&cache);
let cs = Self::compute_cep_stats(&calls, stats, &complexity);
drop(cache);
drop(calls);
crate::core::stats::record_cep_session(
cs.cep_score,
cs.cache_hits,
cs.total_reads,
cs.total_original,
cs.total_compressed,
&cs.mode_counts,
cs.tool_call_count,
&cs.complexity,
);
}
}