Skip to main content

lean_ctx/tools/
server_metrics.rs

1use std::sync::atomic::Ordering;
2
3use super::server::{CepComputedStats, CrpMode, LeanCtxServer, ToolCallRecord};
4use super::startup::auto_consolidate_knowledge;
5use super::{ctx_compress, ctx_share};
6
7impl LeanCtxServer {
8    /// Records a tool call's token savings without timing information.
9    pub async fn record_call(
10        &self,
11        tool: &str,
12        original: usize,
13        saved: usize,
14        mode: Option<String>,
15    ) {
16        self.record_call_with_timing(tool, original, saved, mode, 0)
17            .await;
18    }
19
20    /// Records a tool call like `record_call`, but includes an optional file path for observability.
21    pub async fn record_call_with_path(
22        &self,
23        tool: &str,
24        original: usize,
25        saved: usize,
26        mode: Option<String>,
27        path: Option<&str>,
28    ) {
29        self.record_call_with_timing_inner(tool, original, saved, mode, 0, path)
30            .await;
31    }
32
33    /// Records a tool call's token savings, duration, and emits events and stats.
34    pub async fn record_call_with_timing(
35        &self,
36        tool: &str,
37        original: usize,
38        saved: usize,
39        mode: Option<String>,
40        duration_ms: u64,
41    ) {
42        self.record_call_with_timing_inner(tool, original, saved, mode, duration_ms, None)
43            .await;
44    }
45
46    async fn record_call_with_timing_inner(
47        &self,
48        tool: &str,
49        original: usize,
50        saved: usize,
51        mode: Option<String>,
52        duration_ms: u64,
53        path: Option<&str>,
54    ) {
55        let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
56        let mut calls = self.tool_calls.write().await;
57        calls.push(ToolCallRecord {
58            tool: tool.to_string(),
59            original_tokens: original,
60            saved_tokens: saved,
61            mode: mode.clone(),
62            duration_ms,
63            timestamp: ts.clone(),
64        });
65
66        const MAX_TOOL_CALL_RECORDS: usize = 500;
67        if calls.len() > MAX_TOOL_CALL_RECORDS {
68            let excess = calls.len() - MAX_TOOL_CALL_RECORDS;
69            calls.drain(..excess);
70        }
71
72        if duration_ms > 0 {
73            Self::append_tool_call_log(tool, duration_ms, original, saved, mode.as_deref(), &ts);
74        }
75
76        crate::core::events::emit_tool_call(
77            tool,
78            original as u64,
79            saved as u64,
80            mode.clone(),
81            duration_ms,
82            path.map(ToString::to_string),
83        );
84
85        let output_tokens = original.saturating_sub(saved);
86        crate::core::stats::record(tool, original, output_tokens);
87
88        let mut session = self.session.write().await;
89        session.record_tool_call(saved as u64, original as u64);
90        if tool == "ctx_shell" {
91            session.record_command();
92        }
93        let pending_save = if session.should_save() {
94            session.prepare_save().ok()
95        } else {
96            None
97        };
98        drop(calls);
99        drop(session);
100
101        if let Some(prepared) = pending_save {
102            tokio::task::spawn_blocking(move || {
103                let _ = prepared.write_to_disk();
104            });
105        }
106
107        self.write_mcp_live_stats().await;
108    }
109
110    /// Increments the call counter and returns true if a checkpoint is due.
111    pub fn increment_and_check(&self) -> bool {
112        let count = self.call_count.fetch_add(1, Ordering::Relaxed) + 1;
113        let interval = Self::checkpoint_interval_effective();
114        interval > 0 && count.is_multiple_of(interval)
115    }
116
117    /// Generates a compressed context checkpoint with session state and multi-agent sync.
118    pub async fn auto_checkpoint(&self) -> Option<String> {
119        let cache = self.cache.read().await;
120        if cache.get_all_entries().is_empty() {
121            return None;
122        }
123        let complexity = crate::core::adaptive::classify_from_context(&cache);
124        let checkpoint = ctx_compress::handle(&cache, false, CrpMode::effective());
125        drop(cache);
126
127        let mut session = self.session.write().await;
128        let _ = session.save();
129        let session_summary = session.format_compact();
130        let has_insights = !session.findings.is_empty() || !session.decisions.is_empty();
131        let project_root = session.project_root.clone();
132        drop(session);
133
134        if has_insights {
135            if let Some(ref root) = project_root {
136                let root = root.clone();
137                std::thread::spawn(move || {
138                    auto_consolidate_knowledge(&root);
139                });
140            }
141        }
142
143        let multi_agent_block = self
144            .auto_multi_agent_checkpoint(project_root.as_ref())
145            .await;
146
147        self.record_call("ctx_compress", 0, 0, Some("auto".to_string()))
148            .await;
149
150        self.record_cep_snapshot().await;
151
152        if !crate::core::protocol::meta_visible() {
153            return None;
154        }
155
156        Some(format!(
157            "{checkpoint}\n\n--- SESSION STATE ---\n{session_summary}\n\n{}{multi_agent_block}",
158            complexity.instruction_suffix()
159        ))
160    }
161
162    async fn auto_multi_agent_checkpoint(&self, project_root: Option<&String>) -> String {
163        let Some(root) = project_root else {
164            return String::new();
165        };
166
167        let registry = crate::core::agents::AgentRegistry::load_or_create();
168        let active = registry.list_active(Some(root));
169        if active.len() <= 1 {
170            return String::new();
171        }
172
173        let agent_id = self.agent_id.read().await;
174        let my_id = match agent_id.as_deref() {
175            Some(id) => id.to_string(),
176            None => return String::new(),
177        };
178        drop(agent_id);
179
180        let cache = self.cache.read().await;
181        let entries = cache.get_all_entries();
182        if !entries.is_empty() {
183            let mut by_access: Vec<_> = entries.iter().collect();
184            by_access.sort_by_key(|x| std::cmp::Reverse(x.1.read_count));
185            let top_paths: Vec<&str> = by_access
186                .iter()
187                .take(5)
188                .map(|(key, _)| key.as_str())
189                .collect();
190            let paths_csv = top_paths.join(",");
191
192            let _ = ctx_share::handle(
193                "push",
194                Some(&my_id),
195                None,
196                Some(&paths_csv),
197                None,
198                &cache,
199                root,
200            );
201        }
202        drop(cache);
203
204        let pending_count = registry
205            .scratchpad
206            .iter()
207            .filter(|e| !e.read_by.contains(&my_id) && e.from_agent != my_id)
208            .count();
209
210        let shared_dir = crate::core::data_dir::lean_ctx_data_dir()
211            .unwrap_or_default()
212            .join("agents")
213            .join("shared");
214        let shared_count = if shared_dir.exists() {
215            std::fs::read_dir(&shared_dir).map_or(0, std::iter::Iterator::count)
216        } else {
217            0
218        };
219
220        let agent_names: Vec<String> = active
221            .iter()
222            .map(|a| {
223                let role = a.role.as_deref().unwrap_or(&a.agent_type);
224                format!("{role}({})", &a.agent_id[..8.min(a.agent_id.len())])
225            })
226            .collect();
227
228        format!(
229            "\n\n--- MULTI-AGENT SYNC ---\nAgents: {} | Pending msgs: {} | Shared contexts: {}\nAuto-shared top-5 cached files.\n--- END SYNC ---",
230            agent_names.join(", "),
231            pending_count,
232            shared_count,
233        )
234    }
235
236    /// Appends a tool call entry to the rotating `tool-calls.log` file.
237    pub fn append_tool_call_log(
238        tool: &str,
239        duration_ms: u64,
240        original: usize,
241        saved: usize,
242        mode: Option<&str>,
243        timestamp: &str,
244    ) {
245        const MAX_LOG_LINES: usize = 50;
246        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
247            let log_path = dir.join("tool-calls.log");
248            let mode_str = mode.unwrap_or("-");
249            let slow = if duration_ms > 5000 { " **SLOW**" } else { "" };
250            let line = format!(
251                "{timestamp}\t{tool}\t{duration_ms}ms\torig={original}\tsaved={saved}\tmode={mode_str}{slow}\n"
252            );
253
254            let mut lines: Vec<String> = std::fs::read_to_string(&log_path)
255                .unwrap_or_default()
256                .lines()
257                .map(std::string::ToString::to_string)
258                .collect();
259
260            lines.push(line.trim_end().to_string());
261            if lines.len() > MAX_LOG_LINES {
262                lines.drain(0..lines.len() - MAX_LOG_LINES);
263            }
264
265            let _ = std::fs::write(&log_path, lines.join("\n") + "\n");
266        }
267    }
268
269    fn compute_cep_stats(
270        calls: &[ToolCallRecord],
271        stats: &crate::core::cache::CacheStats,
272        complexity: &crate::core::adaptive::TaskComplexity,
273    ) -> CepComputedStats {
274        let total_original: u64 = calls.iter().map(|c| c.original_tokens as u64).sum();
275        let total_saved: u64 = calls.iter().map(|c| c.saved_tokens as u64).sum();
276        let total_compressed = total_original.saturating_sub(total_saved);
277        let compression_rate = if total_original > 0 {
278            total_saved as f64 / total_original as f64
279        } else {
280            0.0
281        };
282
283        let modes_used: std::collections::HashSet<&str> =
284            calls.iter().filter_map(|c| c.mode.as_deref()).collect();
285        let mode_diversity = (modes_used.len() as f64 / 10.0).min(1.0);
286        let cache_util = stats.hit_rate() / 100.0;
287        let cep_score = cache_util * 0.3 + mode_diversity * 0.2 + compression_rate * 0.5;
288
289        let mut mode_counts: std::collections::HashMap<String, u64> =
290            std::collections::HashMap::new();
291        for call in calls {
292            if let Some(ref mode) = call.mode {
293                *mode_counts.entry(mode.clone()).or_insert(0) += 1;
294            }
295        }
296
297        CepComputedStats {
298            cep_score: (cep_score * 100.0).round() as u32,
299            cache_util: (cache_util * 100.0).round() as u32,
300            mode_diversity: (mode_diversity * 100.0).round() as u32,
301            compression_rate: (compression_rate * 100.0).round() as u32,
302            total_original,
303            total_compressed,
304            total_saved,
305            mode_counts,
306            complexity: format!("{complexity:?}"),
307            cache_hits: stats.cache_hits,
308            total_reads: stats.total_reads,
309            tool_call_count: calls.len() as u64,
310        }
311    }
312
313    async fn write_mcp_live_stats(&self) {
314        let count = self.call_count.load(Ordering::Relaxed);
315        if count > 1 && !count.is_multiple_of(5) {
316            return;
317        }
318
319        let cache = self.cache.read().await;
320        let calls = self.tool_calls.read().await;
321        let stats = cache.get_stats();
322        let complexity = crate::core::adaptive::classify_from_context(&cache);
323
324        let cs = Self::compute_cep_stats(&calls, stats, &complexity);
325        let started_at = calls
326            .first()
327            .map(|c| c.timestamp.clone())
328            .unwrap_or_default();
329
330        drop(cache);
331        drop(calls);
332        let live = serde_json::json!({
333            "cep_score": cs.cep_score,
334            "cache_utilization": cs.cache_util,
335            "mode_diversity": cs.mode_diversity,
336            "compression_rate": cs.compression_rate,
337            "task_complexity": cs.complexity,
338            "files_cached": cs.total_reads,
339            "total_reads": cs.total_reads,
340            "cache_hits": cs.cache_hits,
341            "tokens_saved": cs.total_saved,
342            "tokens_original": cs.total_original,
343            "tool_calls": cs.tool_call_count,
344            "started_at": started_at,
345            "updated_at": chrono::Local::now().to_rfc3339(),
346        });
347
348        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
349            let _ = std::fs::write(dir.join("mcp-live.json"), live.to_string());
350        }
351    }
352
353    /// Persists a CEP (Context Efficiency Protocol) score snapshot for analytics.
354    pub async fn record_cep_snapshot(&self) {
355        let cache = self.cache.read().await;
356        let calls = self.tool_calls.read().await;
357        let stats = cache.get_stats();
358        let complexity = crate::core::adaptive::classify_from_context(&cache);
359
360        let cs = Self::compute_cep_stats(&calls, stats, &complexity);
361
362        drop(cache);
363        drop(calls);
364
365        crate::core::stats::record_cep_session(
366            cs.cep_score,
367            cs.cache_hits,
368            cs.total_reads,
369            cs.total_original,
370            cs.total_compressed,
371            &cs.mode_counts,
372            cs.tool_call_count,
373            &cs.complexity,
374        );
375    }
376}