Skip to main content

lean_ctx/tools/
server_metrics.rs

1use std::sync::atomic::Ordering;
2
3use super::server::{CepComputedStats, CrpMode, LeanCtxServer, ToolCallRecord};
4use super::startup::auto_consolidate_knowledge;
5use super::{ctx_compress, ctx_share};
6
7impl LeanCtxServer {
8    /// Records a tool call's token savings without timing information.
9    pub async fn record_call(
10        &self,
11        tool: &str,
12        original: usize,
13        saved: usize,
14        mode: Option<String>,
15    ) {
16        self.record_call_with_timing(tool, original, saved, mode, 0)
17            .await;
18    }
19
20    /// Records a tool call like `record_call`, but includes an optional file path for observability.
21    pub async fn record_call_with_path(
22        &self,
23        tool: &str,
24        original: usize,
25        saved: usize,
26        mode: Option<String>,
27        path: Option<&str>,
28    ) {
29        self.record_call_with_timing_inner(tool, original, saved, mode, 0, path)
30            .await;
31    }
32
33    /// Records a tool call's token savings, duration, and emits events and stats.
34    pub async fn record_call_with_timing(
35        &self,
36        tool: &str,
37        original: usize,
38        saved: usize,
39        mode: Option<String>,
40        duration_ms: u64,
41    ) {
42        self.record_call_with_timing_inner(tool, original, saved, mode, duration_ms, None)
43            .await;
44    }
45
46    async fn record_call_with_timing_inner(
47        &self,
48        tool: &str,
49        original: usize,
50        saved: usize,
51        mode: Option<String>,
52        duration_ms: u64,
53        path: Option<&str>,
54    ) {
55        let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S").to_string();
56        let mut calls = self.tool_calls.write().await;
57        calls.push(ToolCallRecord {
58            tool: tool.to_string(),
59            original_tokens: original,
60            saved_tokens: saved,
61            mode: mode.clone(),
62            duration_ms,
63            timestamp: ts.clone(),
64        });
65
66        const MAX_TOOL_CALL_RECORDS: usize = 500;
67        if calls.len() > MAX_TOOL_CALL_RECORDS {
68            let excess = calls.len() - MAX_TOOL_CALL_RECORDS;
69            calls.drain(..excess);
70        }
71
72        if duration_ms > 0 {
73            Self::append_tool_call_log(tool, duration_ms, original, saved, mode.as_deref(), &ts);
74        }
75
76        crate::core::events::emit_tool_call(
77            tool,
78            original as u64,
79            saved as u64,
80            mode.clone(),
81            duration_ms,
82            path.map(ToString::to_string),
83        );
84
85        let output_tokens = original.saturating_sub(saved);
86        crate::core::stats::record(tool, original, output_tokens);
87
88        let mut session = self.session.write().await;
89        session.record_tool_call(saved as u64, original as u64);
90        if tool == "ctx_shell" {
91            session.record_command();
92        }
93        let pending_save = if session.should_save() {
94            session.prepare_save().ok()
95        } else {
96            None
97        };
98        drop(calls);
99        drop(session);
100
101        if let Some(prepared) = pending_save {
102            tokio::task::spawn_blocking(move || {
103                let _ = prepared.write_to_disk();
104            });
105        }
106
107        self.write_mcp_live_stats().await;
108    }
109
110    /// Increments the call counter and returns true if a checkpoint is due.
111    pub fn increment_and_check(&self) -> bool {
112        let count = self.call_count.fetch_add(1, Ordering::Relaxed) + 1;
113        let interval = Self::checkpoint_interval_effective();
114        interval > 0 && count.is_multiple_of(interval)
115    }
116
117    /// Generates a compressed context checkpoint with session state and multi-agent sync.
118    pub async fn auto_checkpoint(&self) -> Option<String> {
119        let cache = self.cache.read().await;
120        if cache.get_all_entries().is_empty() {
121            return None;
122        }
123        let complexity = crate::core::adaptive::classify_from_context(&cache);
124        let checkpoint = ctx_compress::handle(&cache, false, CrpMode::effective());
125        drop(cache);
126
127        let mut session = self.session.write().await;
128        let _ = session.save();
129        let session_summary = session.format_compact();
130        let has_insights = !session.findings.is_empty() || !session.decisions.is_empty();
131        let project_root = session.project_root.clone();
132        drop(session);
133
134        if has_insights {
135            if let Some(ref root) = project_root {
136                let root = root.clone();
137                std::thread::spawn(move || {
138                    auto_consolidate_knowledge(&root);
139                });
140            }
141        }
142
143        let multi_agent_block = self
144            .auto_multi_agent_checkpoint(project_root.as_ref())
145            .await;
146
147        self.record_call("ctx_compress", 0, 0, Some("auto".to_string()))
148            .await;
149
150        self.record_cep_snapshot().await;
151
152        if !crate::core::protocol::meta_visible() {
153            return None;
154        }
155
156        // Check if agent has documented anything recently
157        let doc_reminder = {
158            let session = self.session.read().await;
159            let calls_since_last_doc = Self::calls_since_last_documentation(&session);
160            if calls_since_last_doc >= 30 {
161                "\n[CHECKPOINT: please document current progress via ctx_session(action=\"task\") or ctx_knowledge(action=\"remember\")]"
162            } else {
163                ""
164            }
165        };
166
167        Some(format!(
168            "{checkpoint}\n\n--- SESSION STATE ---\n{session_summary}\n\n{}{multi_agent_block}{doc_reminder}",
169            complexity.instruction_suffix()
170        ))
171    }
172
173    async fn auto_multi_agent_checkpoint(&self, project_root: Option<&String>) -> String {
174        let Some(root) = project_root else {
175            return String::new();
176        };
177
178        let registry = crate::core::agents::AgentRegistry::load_or_create();
179        let active = registry.list_active(Some(root));
180        if active.len() <= 1 {
181            return String::new();
182        }
183
184        let agent_id = self.agent_id.read().await;
185        let my_id = match agent_id.as_deref() {
186            Some(id) => id.to_string(),
187            None => return String::new(),
188        };
189        drop(agent_id);
190
191        let cache = self.cache.read().await;
192        let entries = cache.get_all_entries();
193        if !entries.is_empty() {
194            let mut by_access: Vec<_> = entries.iter().collect();
195            by_access.sort_by_key(|x| std::cmp::Reverse(x.1.read_count));
196            let top_paths: Vec<&str> = by_access
197                .iter()
198                .take(5)
199                .map(|(key, _)| key.as_str())
200                .collect();
201            let paths_csv = top_paths.join(",");
202
203            let _ = ctx_share::handle(
204                "push",
205                Some(&my_id),
206                None,
207                Some(&paths_csv),
208                None,
209                &cache,
210                root,
211            );
212        }
213        drop(cache);
214
215        let pending_count = registry
216            .scratchpad
217            .iter()
218            .filter(|e| !e.read_by.contains(&my_id) && e.from_agent != my_id)
219            .count();
220
221        let shared_dir = crate::core::data_dir::lean_ctx_data_dir()
222            .unwrap_or_default()
223            .join("agents")
224            .join("shared");
225        let shared_count = if shared_dir.exists() {
226            std::fs::read_dir(&shared_dir).map_or(0, std::iter::Iterator::count)
227        } else {
228            0
229        };
230
231        let agent_names: Vec<String> = active
232            .iter()
233            .map(|a| {
234                let role = a.role.as_deref().unwrap_or(&a.agent_type);
235                format!("{role}({})", &a.agent_id[..8.min(a.agent_id.len())])
236            })
237            .collect();
238
239        format!(
240            "\n\n--- MULTI-AGENT SYNC ---\nAgents: {} | Pending msgs: {} | Shared contexts: {}\nAuto-shared top-5 cached files.\n--- END SYNC ---",
241            agent_names.join(", "),
242            pending_count,
243            shared_count,
244        )
245    }
246
247    /// Appends a tool call entry to the rotating `tool-calls.log` file.
248    pub fn append_tool_call_log(
249        tool: &str,
250        duration_ms: u64,
251        original: usize,
252        saved: usize,
253        mode: Option<&str>,
254        timestamp: &str,
255    ) {
256        const MAX_LOG_LINES: usize = 50;
257        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
258            let log_path = dir.join("tool-calls.log");
259            let mode_str = mode.unwrap_or("-");
260            let slow = if duration_ms > 5000 { " **SLOW**" } else { "" };
261            let line = format!(
262                "{timestamp}\t{tool}\t{duration_ms}ms\torig={original}\tsaved={saved}\tmode={mode_str}{slow}\n"
263            );
264
265            let mut lines: Vec<String> = std::fs::read_to_string(&log_path)
266                .unwrap_or_default()
267                .lines()
268                .map(std::string::ToString::to_string)
269                .collect();
270
271            lines.push(line.trim_end().to_string());
272            if lines.len() > MAX_LOG_LINES {
273                lines.drain(0..lines.len() - MAX_LOG_LINES);
274            }
275
276            let _ = std::fs::write(&log_path, lines.join("\n") + "\n");
277        }
278    }
279
280    fn compute_cep_stats(
281        calls: &[ToolCallRecord],
282        stats: &crate::core::cache::CacheStats,
283        complexity: &crate::core::adaptive::TaskComplexity,
284    ) -> CepComputedStats {
285        let total_original: u64 = calls.iter().map(|c| c.original_tokens as u64).sum();
286        let total_saved: u64 = calls.iter().map(|c| c.saved_tokens as u64).sum();
287        let total_compressed = total_original.saturating_sub(total_saved);
288        let compression_rate = if total_original > 0 {
289            total_saved as f64 / total_original as f64
290        } else {
291            0.0
292        };
293
294        let modes_used: std::collections::HashSet<&str> =
295            calls.iter().filter_map(|c| c.mode.as_deref()).collect();
296        let mode_diversity = (modes_used.len() as f64 / 10.0).min(1.0);
297        let cache_util = stats.hit_rate() / 100.0;
298        let cep_score = cache_util * 0.3 + mode_diversity * 0.2 + compression_rate * 0.5;
299
300        let mut mode_counts: std::collections::HashMap<String, u64> =
301            std::collections::HashMap::new();
302        for call in calls {
303            if let Some(ref mode) = call.mode {
304                *mode_counts.entry(mode.clone()).or_insert(0) += 1;
305            }
306        }
307
308        CepComputedStats {
309            cep_score: (cep_score * 100.0).round() as u32,
310            cache_util: (cache_util * 100.0).round() as u32,
311            mode_diversity: (mode_diversity * 100.0).round() as u32,
312            compression_rate: (compression_rate * 100.0).round() as u32,
313            total_original,
314            total_compressed,
315            total_saved,
316            mode_counts,
317            complexity: format!("{complexity:?}"),
318            cache_hits: stats.cache_hits,
319            total_reads: stats.total_reads,
320            tool_call_count: calls.len() as u64,
321        }
322    }
323
324    async fn write_mcp_live_stats(&self) {
325        let count = self.call_count.load(Ordering::Relaxed);
326        if count > 1 && !count.is_multiple_of(5) {
327            return;
328        }
329
330        let cache = self.cache.read().await;
331        let calls = self.tool_calls.read().await;
332        let stats = cache.get_stats();
333        let complexity = crate::core::adaptive::classify_from_context(&cache);
334
335        let cs = Self::compute_cep_stats(&calls, stats, &complexity);
336        let started_at = calls
337            .first()
338            .map(|c| c.timestamp.clone())
339            .unwrap_or_default();
340
341        drop(cache);
342        drop(calls);
343        let live = serde_json::json!({
344            "cep_score": cs.cep_score,
345            "cache_utilization": cs.cache_util,
346            "mode_diversity": cs.mode_diversity,
347            "compression_rate": cs.compression_rate,
348            "task_complexity": cs.complexity,
349            "files_cached": cs.total_reads,
350            "total_reads": cs.total_reads,
351            "cache_hits": cs.cache_hits,
352            "tokens_saved": cs.total_saved,
353            "tokens_original": cs.total_original,
354            "tool_calls": cs.tool_call_count,
355            "started_at": started_at,
356            "updated_at": chrono::Local::now().to_rfc3339(),
357        });
358
359        if let Ok(dir) = crate::core::data_dir::lean_ctx_data_dir() {
360            let _ = std::fs::write(dir.join("mcp-live.json"), live.to_string());
361        }
362    }
363
364    /// Persists a CEP (Context Efficiency Protocol) score snapshot for analytics.
365    pub async fn record_cep_snapshot(&self) {
366        let cache = self.cache.read().await;
367        let calls = self.tool_calls.read().await;
368        let stats = cache.get_stats();
369        let complexity = crate::core::adaptive::classify_from_context(&cache);
370
371        let cs = Self::compute_cep_stats(&calls, stats, &complexity);
372
373        drop(cache);
374        drop(calls);
375
376        crate::core::stats::record_cep_session(
377            cs.cep_score,
378            cs.cache_hits,
379            cs.total_reads,
380            cs.total_original,
381            cs.total_compressed,
382            &cs.mode_counts,
383            cs.tool_call_count,
384            &cs.complexity,
385        );
386    }
387
388    /// Counts tool calls since the last documentation action (ctx_knowledge remember, ctx_session task).
389    fn calls_since_last_documentation(session: &crate::core::session::SessionState) -> u64 {
390        let total = session.stats.total_tool_calls;
391        // Use findings and decisions timestamps as proxy for documentation activity
392        let last_finding_ts = session.findings.last().map(|f| f.timestamp);
393        let last_decision_ts = session.decisions.last().map(|d| d.timestamp);
394
395        // If there are recent decisions or explicit task updates, consider it documented
396        if session.task.is_some() {
397            // Task was set at some point — check how many calls ago
398            let task_set_at = session
399                .progress
400                .last()
401                .map(|p| p.timestamp)
402                .or(last_decision_ts)
403                .or(last_finding_ts);
404
405            if let Some(ts) = task_set_at {
406                let age = chrono::Utc::now() - ts;
407                // If last doc action was less than 5 minutes ago, not stale
408                if age.num_minutes() < 5 {
409                    return 0;
410                }
411            }
412        }
413
414        // Conservative: if no decisions at all, report total calls
415        if session.decisions.is_empty() && session.progress.is_empty() {
416            return u64::from(total);
417        }
418
419        // Estimate calls since last documentation based on time
420        u64::from(total.min(30))
421    }
422}