Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{get_bool, get_int, get_str, McpTool, ToolContext, ToolOutput};
10use crate::tool_defs::tool_def;
11
12/// Per-file lock that serializes concurrent reads of the same path.
13///
14/// When multiple subagents read sequentially through a shared set of files,
15/// they tend to hit the same path at the same time. Without per-file locking
16/// they all contend on the global cache write lock while doing redundant I/O.
17/// This lock ensures only one thread reads a given file from disk; the others
18/// wait cheaply on the per-file mutex, then hit the warm cache.
19fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
20    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
21        std::sync::OnceLock::new();
22    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
23    let mut map = map.lock().unwrap();
24
25    const MAX_ENTRIES: usize = 500;
26    if map.len() > MAX_ENTRIES {
27        map.retain(|_, v| Arc::strong_count(v) > 1);
28    }
29
30    map.entry(path.to_string())
31        .or_insert_with(|| Arc::new(Mutex::new(())))
32        .clone()
33}
34
35pub struct CtxReadTool;
36
37impl McpTool for CtxReadTool {
38    fn name(&self) -> &'static str {
39        "ctx_read"
40    }
41
42    fn tool_def(&self) -> Tool {
43        tool_def(
44            "ctx_read",
45            "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
46Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
47            json!({
48                "type": "object",
49                "properties": {
50                    "path": { "type": "string", "description": "Absolute file path to read" },
51                    "mode": {
52                        "type": "string",
53                        "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
54                    },
55                    "start_line": {
56                        "type": "integer",
57                        "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
58                    },
59                    "fresh": {
60                        "type": "boolean",
61                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
62                    }
63                },
64                "required": ["path"]
65            }),
66        )
67    }
68
69    fn handle(
70        &self,
71        args: &Map<String, Value>,
72        ctx: &ToolContext,
73    ) -> Result<ToolOutput, ErrorData> {
74        let path = ctx
75            .resolved_path("path")
76            .ok_or_else(|| ErrorData::invalid_params("path is required", None))?
77            .to_string();
78
79        self.handle_inner(args, ctx, &path)
80    }
81}
82
83impl CtxReadTool {
84    #[allow(clippy::unused_self)]
85    fn handle_inner(
86        &self,
87        args: &Map<String, Value>,
88        ctx: &ToolContext,
89        path: &str,
90    ) -> Result<ToolOutput, ErrorData> {
91        let session_lock = ctx
92            .session
93            .as_ref()
94            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
95        let cache_lock = ctx
96            .cache
97            .as_ref()
98            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
99
100        let current_task = {
101            let session = session_lock.blocking_read();
102            session.task.as_ref().map(|t| t.description.clone())
103        };
104        let task_ref = current_task.as_deref();
105
106        let profile = crate::core::profiles::active_profile();
107        let mut mode = if let Some(m) = get_str(args, "mode") {
108            m
109        } else if profile.read.default_mode_effective() == "auto" {
110            // Non-blocking: if the cache write-lock is held by a timed-out zombie
111            // thread, blocking_read() would hang with NO timeout protection.
112            if let Ok(cache) = cache_lock.try_read() {
113                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
114            } else {
115                tracing::debug!(
116                    "cache lock contested during auto-mode selection for {path}; \
117                     falling back to full"
118                );
119                "full".to_string()
120            }
121        } else {
122            profile.read.default_mode_effective().to_string()
123        };
124
125        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
126        let start_line = get_int(args, "start_line");
127        if let Some(sl) = start_line {
128            let sl = sl.max(1_i64);
129            mode = format!("lines:{sl}-999999");
130            fresh = true;
131        }
132
133        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
134        let resolved_agent_id = ctx
135            .agent_id
136            .as_ref()
137            .and_then(|a| a.blocking_read().clone());
138        let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
139            path,
140            &mode,
141            task_ref,
142            Some(&ctx.project_root),
143            pressure_action,
144            resolved_agent_id.as_deref(),
145        );
146        if gate_result.budget_blocked {
147            let msg = gate_result
148                .budget_warning
149                .unwrap_or_else(|| "Agent token budget exceeded".to_string());
150            return Err(ErrorData::invalid_params(msg, None));
151        }
152        let budget_warning = gate_result.budget_warning.clone();
153        if let Some(overridden) = gate_result.overridden_mode {
154            mode = overridden;
155        }
156
157        let mode = if crate::tools::ctx_read::is_instruction_file(path) {
158            "full".to_string()
159        } else {
160            auto_degrade_read_mode(&mode)
161        };
162
163        if mode.starts_with("lines:") {
164            fresh = true;
165        }
166
167        if crate::core::binary_detect::is_binary_file(path) {
168            let msg = crate::core::binary_detect::binary_file_message(path);
169            return Err(ErrorData::invalid_params(msg, None));
170        }
171        {
172            let cap = crate::core::limits::max_read_bytes() as u64;
173            if let Ok(meta) = std::fs::metadata(path) {
174                if meta.len() > cap {
175                    let msg = format!(
176                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
177                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
178                        meta.len(),
179                        cap
180                    );
181                    return Err(ErrorData::invalid_params(msg, None));
182                }
183            }
184        }
185
186        // Acquire cache write lock with timeout guard to prevent infinite hangs.
187        // All lock acquisitions (per-file mutex, cache RwLock) use bounded waits
188        // so a zombie thread from a previous timed-out call cannot block us forever.
189        let read_timeout = std::time::Duration::from_secs(30);
190        let cancelled = Arc::new(AtomicBool::new(false));
191        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
192            let cache_lock = cache_lock.clone();
193            let mode = mode.clone();
194            let crp_mode = ctx.crp_mode;
195            let task_owned = current_task.clone();
196            let path_owned = path.to_string();
197            let cancel_flag = cancelled.clone();
198            let (tx, rx) = std::sync::mpsc::sync_channel(1);
199            std::thread::spawn(move || {
200                let file_lock = per_file_lock(&path_owned);
201
202                // Bounded per-file lock: if a zombie thread still holds it, don't
203                // wait forever. 25s keeps us inside the 30s recv_timeout.
204                let _file_guard = {
205                    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
206                    loop {
207                        if cancel_flag.load(Ordering::Relaxed) {
208                            return;
209                        }
210                        if let Ok(guard) = file_lock.try_lock() {
211                            break guard;
212                        }
213                        if std::time::Instant::now() >= deadline {
214                            tracing::error!(
215                                "ctx_read: per-file lock timeout after 25s for {path_owned}"
216                            );
217                            return;
218                        }
219                        std::thread::sleep(std::time::Duration::from_millis(50));
220                    }
221                };
222
223                if cancel_flag.load(Ordering::Relaxed) {
224                    return;
225                }
226
227                // Bounded cache write-lock: avoids indefinite block when a zombie
228                // thread from a previous timed-out call still holds the lock.
229                let mut cache = {
230                    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
231                    loop {
232                        if cancel_flag.load(Ordering::Relaxed) {
233                            return;
234                        }
235                        if let Ok(guard) = cache_lock.try_write() {
236                            break guard;
237                        }
238                        if std::time::Instant::now() >= deadline {
239                            tracing::error!(
240                                "ctx_read: cache write-lock timeout after 25s for {path_owned}"
241                            );
242                            return;
243                        }
244                        std::thread::sleep(std::time::Duration::from_millis(50));
245                    }
246                };
247
248                let task_ref = task_owned.as_deref();
249                let read_output = if fresh {
250                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
251                        &mut cache,
252                        &path_owned,
253                        &mode,
254                        crp_mode,
255                        task_ref,
256                    )
257                } else {
258                    crate::tools::ctx_read::handle_with_task_resolved(
259                        &mut cache,
260                        &path_owned,
261                        &mode,
262                        crp_mode,
263                        task_ref,
264                    )
265                };
266                let content = read_output.content;
267                let rmode = read_output.resolved_mode;
268                let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
269                let hit = content.contains(" cached ");
270                let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
271                let stats = cache.get_stats();
272                let stats_snapshot = (stats.total_reads, stats.cache_hits);
273                let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
274            });
275            if let Ok(result) = rx.recv_timeout(read_timeout) {
276                result
277            } else {
278                cancelled.store(true, Ordering::Relaxed);
279                tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
280                let msg = format!(
281                    "ERROR: ctx_read timed out after {}s reading {path}. \
282                     The file may be very large or a blocking I/O issue occurred. \
283                     Try mode=\"lines:1-100\" for a partial read.",
284                    read_timeout.as_secs()
285                );
286                return Err(ErrorData::internal_error(msg, None));
287            }
288        };
289
290        let output_tokens = crate::core::tokens::count_tokens(&output);
291        let saved = original.saturating_sub(output_tokens);
292
293        // Session updates (short lock)
294        let mut ensured_root: Option<String> = None;
295        let project_root_snapshot;
296        {
297            let mut session = session_lock.blocking_write();
298            session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
299            if is_cache_hit {
300                session.record_cache_hit();
301            }
302            if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
303                let touched: Vec<String> = session
304                    .files_touched
305                    .iter()
306                    .map(|f| f.path.clone())
307                    .collect();
308                let inferred =
309                    crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
310                if inferred.confidence >= 0.4 {
311                    session.active_structured_intent = Some(inferred);
312                }
313            }
314            let root_missing = session
315                .project_root
316                .as_deref()
317                .is_none_or(|r| r.trim().is_empty());
318            if root_missing {
319                if let Some(root) = crate::core::protocol::detect_project_root(path) {
320                    session.project_root = Some(root.clone());
321                    ensured_root = Some(root);
322                }
323            }
324            project_root_snapshot = session
325                .project_root
326                .clone()
327                .unwrap_or_else(|| ".".to_string());
328        }
329
330        if let Some(root) = ensured_root.as_deref() {
331            crate::core::index_orchestrator::ensure_all_background(root);
332        }
333
334        crate::core::heatmap::record_file_access(path, original, saved);
335
336        // Mode predictor + feedback — no locks needed, uses snapshots from above
337        {
338            let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
339            let density = if output_tokens > 0 {
340                original as f64 / output_tokens as f64
341            } else {
342                1.0
343            };
344            let outcome = crate::core::mode_predictor::ModeOutcome {
345                mode: resolved_mode.clone(),
346                tokens_in: original,
347                tokens_out: output_tokens,
348                density: density.min(1.0),
349            };
350            let mut predictor = crate::core::mode_predictor::ModePredictor::new();
351            predictor.set_project_root(&project_root_snapshot);
352            predictor.record(sig, outcome);
353            predictor.save();
354
355            let ext = std::path::Path::new(path)
356                .extension()
357                .and_then(|e| e.to_str())
358                .unwrap_or("")
359                .to_string();
360            let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
361            let feedback_outcome = crate::core::feedback::CompressionOutcome {
362                session_id: format!("{}", std::process::id()),
363                language: ext,
364                entropy_threshold: thresholds.bpe_entropy,
365                jaccard_threshold: thresholds.jaccard,
366                total_turns: cache_stats.0 as u32,
367                tokens_saved: saved as u64,
368                tokens_original: original as u64,
369                cache_hits: cache_stats.1 as u32,
370                total_reads: cache_stats.0 as u32,
371                task_completed: true,
372                timestamp: chrono::Local::now().to_rfc3339(),
373            };
374            let mut store = crate::core::feedback::FeedbackStore::load();
375            store.project_root = Some(project_root_snapshot.clone());
376            store.record_outcome(feedback_outcome);
377        }
378
379        if let Some(aid) = resolved_agent_id.as_deref() {
380            crate::core::agent_budget::record_consumption(aid, output_tokens);
381        }
382
383        let final_output = if let Some(ref warning) = budget_warning {
384            format!("{output}\n\n{warning}")
385        } else {
386            output
387        };
388
389        Ok(ToolOutput {
390            text: final_output,
391            original_tokens: original,
392            saved_tokens: saved,
393            mode: Some(resolved_mode),
394            path: Some(path.to_string()),
395            changed: false,
396        })
397    }
398}
399
400fn auto_degrade_read_mode(mode: &str) -> String {
401    use crate::core::degradation_policy::DegradationVerdictV1;
402    let profile = crate::core::profiles::active_profile();
403    if !profile.degradation.enforce_effective() {
404        return mode.to_string();
405    }
406    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
407    match policy.decision.verdict {
408        DegradationVerdictV1::Ok => mode.to_string(),
409        DegradationVerdictV1::Warn => match mode {
410            "full" => "map".to_string(),
411            other => other.to_string(),
412        },
413        DegradationVerdictV1::Throttle => match mode {
414            "full" | "map" => "signatures".to_string(),
415            other => other.to_string(),
416        },
417        DegradationVerdictV1::Block => "signatures".to_string(),
418    }
419}
420
421#[cfg(test)]
422mod tests {
423    use super::*;
424    use std::sync::atomic::{AtomicUsize, Ordering};
425
426    #[test]
427    fn per_file_lock_same_path_returns_same_mutex() {
428        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
429        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
430        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
431    }
432
433    #[test]
434    fn per_file_lock_different_paths_return_different_mutexes() {
435        let lock_a = per_file_lock("/tmp/test_path_a.txt");
436        let lock_b = per_file_lock("/tmp/test_path_b.txt");
437        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
438    }
439
440    #[test]
441    fn per_file_lock_serializes_concurrent_access() {
442        let counter = Arc::new(AtomicUsize::new(0));
443        let max_concurrent = Arc::new(AtomicUsize::new(0));
444        let path = "/tmp/test_concurrent_serialization.txt";
445        let mut handles = Vec::new();
446
447        for _ in 0..5 {
448            let counter = counter.clone();
449            let max_concurrent = max_concurrent.clone();
450            let path = path.to_string();
451            handles.push(std::thread::spawn(move || {
452                let lock = per_file_lock(&path);
453                let _guard = lock.lock().unwrap();
454                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
455                max_concurrent.fetch_max(active, Ordering::SeqCst);
456                std::thread::sleep(std::time::Duration::from_millis(10));
457                counter.fetch_sub(1, Ordering::SeqCst);
458            }));
459        }
460
461        for h in handles {
462            h.join().unwrap();
463        }
464
465        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
466    }
467
468    #[test]
469    fn per_file_lock_allows_parallel_different_paths() {
470        let counter = Arc::new(AtomicUsize::new(0));
471        let max_concurrent = Arc::new(AtomicUsize::new(0));
472        let mut handles = Vec::new();
473
474        for i in 0..4 {
475            let counter = counter.clone();
476            let max_concurrent = max_concurrent.clone();
477            let path = format!("/tmp/test_parallel_{i}.txt");
478            handles.push(std::thread::spawn(move || {
479                let lock = per_file_lock(&path);
480                let _guard = lock.lock().unwrap();
481                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
482                max_concurrent.fetch_max(active, Ordering::SeqCst);
483                std::thread::sleep(std::time::Duration::from_millis(50));
484                counter.fetch_sub(1, Ordering::SeqCst);
485            }));
486        }
487
488        for h in handles {
489            h.join().unwrap();
490        }
491
492        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
493    }
494
495    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
496    /// must not block subsequent reads indefinitely. The try_write() loop inside
497    /// the spawned thread should respect its 25s deadline and the cancellation flag.
498    #[test]
499    fn zombie_thread_does_not_block_subsequent_cache_access() {
500        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
501
502        // Simulate a zombie: hold the write-lock on a background thread for 2s.
503        let zombie_lock = cache.clone();
504        let _zombie = std::thread::spawn(move || {
505            let _guard = zombie_lock.blocking_write();
506            std::thread::sleep(std::time::Duration::from_secs(2));
507        });
508        std::thread::sleep(std::time::Duration::from_millis(50));
509
510        // A try_read() must fail immediately (zombie holds write-lock).
511        assert!(cache.try_read().is_err());
512
513        // A try_write() loop with cancellation must exit promptly.
514        let cancel = Arc::new(AtomicBool::new(false));
515        let cancel2 = cancel.clone();
516        let lock2 = cache.clone();
517        let waiter = std::thread::spawn(move || {
518            let start = std::time::Instant::now();
519            loop {
520                if cancel2.load(Ordering::Relaxed) {
521                    return (false, start.elapsed());
522                }
523                if let Ok(_guard) = lock2.try_write() {
524                    return (true, start.elapsed());
525                }
526                std::thread::sleep(std::time::Duration::from_millis(50));
527            }
528        });
529
530        // Set cancellation after 200ms — the loop should exit quickly.
531        std::thread::sleep(std::time::Duration::from_millis(200));
532        cancel.store(true, Ordering::Relaxed);
533
534        let (acquired, elapsed) = waiter.join().unwrap();
535        assert!(
536            !acquired,
537            "should not have acquired lock while zombie holds it"
538        );
539        assert!(
540            elapsed < std::time::Duration::from_secs(1),
541            "cancellation should have stopped the loop promptly"
542        );
543    }
544}