Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{
10    get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
11};
12use crate::tool_defs::tool_def;
13
14/// Per-file lock that serializes concurrent reads of the same path.
15///
16/// When multiple subagents read sequentially through a shared set of files,
17/// they tend to hit the same path at the same time. Without per-file locking
18/// they all contend on the global cache write lock while doing redundant I/O.
19/// This lock ensures only one thread reads a given file from disk; the others
20/// wait cheaply on the per-file mutex, then hit the warm cache.
21fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
22    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
23        std::sync::OnceLock::new();
24    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
25    let mut map = map.lock().unwrap();
26
27    const MAX_ENTRIES: usize = 500;
28    if map.len() > MAX_ENTRIES {
29        map.retain(|_, v| Arc::strong_count(v) > 1);
30    }
31
32    map.entry(path.to_string())
33        .or_insert_with(|| Arc::new(Mutex::new(())))
34        .clone()
35}
36
37pub struct CtxReadTool;
38
39impl McpTool for CtxReadTool {
40    fn name(&self) -> &'static str {
41        "ctx_read"
42    }
43
44    fn tool_def(&self) -> Tool {
45        tool_def(
46            "ctx_read",
47            "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
48Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
49            json!({
50                "type": "object",
51                "properties": {
52                    "path": { "type": "string", "description": "Absolute file path to read" },
53                    "mode": {
54                        "type": "string",
55                        "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
56                    },
57                    "start_line": {
58                        "type": "integer",
59                        "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
60                    },
61                    "fresh": {
62                        "type": "boolean",
63                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
64                    }
65                },
66                "required": ["path"]
67            }),
68        )
69    }
70
71    fn handle(
72        &self,
73        args: &Map<String, Value>,
74        ctx: &ToolContext,
75    ) -> Result<ToolOutput, ErrorData> {
76        let path = require_resolved_path(ctx, args, "path")?;
77
78        self.handle_inner(args, ctx, &path)
79    }
80}
81
82impl CtxReadTool {
83    #[allow(clippy::unused_self)]
84    fn handle_inner(
85        &self,
86        args: &Map<String, Value>,
87        ctx: &ToolContext,
88        path: &str,
89    ) -> Result<ToolOutput, ErrorData> {
90        let session_lock = ctx
91            .session
92            .as_ref()
93            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
94        let cache_lock = ctx
95            .cache
96            .as_ref()
97            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
98
99        let current_task = {
100            let session = session_lock.blocking_read();
101            session.task.as_ref().map(|t| t.description.clone())
102        };
103        let task_ref = current_task.as_deref();
104
105        let profile = crate::core::profiles::active_profile();
106        let mut mode = if let Some(m) = get_str(args, "mode") {
107            m
108        } else if profile.read.default_mode_effective() == "auto" {
109            // Non-blocking: if the cache write-lock is held by a timed-out zombie
110            // thread, blocking_read() would hang with NO timeout protection.
111            if let Ok(cache) = cache_lock.try_read() {
112                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
113            } else {
114                tracing::debug!(
115                    "cache lock contested during auto-mode selection for {path}; \
116                     falling back to full"
117                );
118                "full".to_string()
119            }
120        } else {
121            profile.read.default_mode_effective().to_string()
122        };
123
124        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
125        let start_line = get_int(args, "start_line");
126        if let Some(sl) = start_line {
127            let sl = sl.max(1_i64);
128            mode = format!("lines:{sl}-999999");
129            fresh = true;
130        }
131
132        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
133        let resolved_agent_id = ctx
134            .agent_id
135            .as_ref()
136            .and_then(|a| a.blocking_read().clone());
137        let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
138            path,
139            &mode,
140            task_ref,
141            Some(&ctx.project_root),
142            pressure_action,
143            resolved_agent_id.as_deref(),
144        );
145        if gate_result.budget_blocked {
146            let msg = gate_result
147                .budget_warning
148                .unwrap_or_else(|| "Agent token budget exceeded".to_string());
149            return Err(ErrorData::invalid_params(msg, None));
150        }
151        let budget_warning = gate_result.budget_warning.clone();
152        if let Some(overridden) = gate_result.overridden_mode {
153            mode = overridden;
154        }
155
156        let mode = if crate::tools::ctx_read::is_instruction_file(path) {
157            "full".to_string()
158        } else {
159            auto_degrade_read_mode(&mode)
160        };
161
162        if mode.starts_with("lines:") {
163            fresh = true;
164        }
165
166        if crate::core::binary_detect::is_binary_file(path) {
167            let msg = crate::core::binary_detect::binary_file_message(path);
168            return Err(ErrorData::invalid_params(msg, None));
169        }
170        {
171            let cap = crate::core::limits::max_read_bytes() as u64;
172            if let Ok(meta) = std::fs::metadata(path) {
173                if meta.len() > cap {
174                    let msg = format!(
175                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
176                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
177                        meta.len(),
178                        cap
179                    );
180                    return Err(ErrorData::invalid_params(msg, None));
181                }
182            }
183        }
184
185        // Fast path: if both per-file lock and cache write-lock are immediately
186        // available, execute inline without spawning a thread. This avoids thread +
187        // channel overhead for the ~90% of calls that are cache hits.
188        let read_timeout = std::time::Duration::from_secs(30);
189        let cancelled = Arc::new(AtomicBool::new(false));
190        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
191            let crp_mode = ctx.crp_mode;
192            let task_ref = current_task.as_deref();
193
194            let fast_result = 'fast: {
195                let file_lock = per_file_lock(path);
196                let Some(_file_guard) = file_lock.try_lock().ok() else {
197                    break 'fast None;
198                };
199                let Some(mut cache) = cache_lock.try_write().ok() else {
200                    break 'fast None;
201                };
202                let read_output = if fresh {
203                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
204                        &mut cache, path, &mode, crp_mode, task_ref,
205                    )
206                } else {
207                    crate::tools::ctx_read::handle_with_task_resolved(
208                        &mut cache, path, &mode, crp_mode, task_ref,
209                    )
210                };
211                let content = read_output.content;
212                let rmode = read_output.resolved_mode;
213                let orig = cache.get(path).map_or(0, |e| e.original_tokens);
214                let hit = content.contains(" cached ")
215                    || content.contains("[unchanged")
216                    || content.contains("[delta:");
217                let fref = cache.file_ref_map().get(path).cloned();
218                let stats = cache.get_stats();
219                let stats_snapshot = (stats.total_reads, stats.cache_hits);
220                Some((content, rmode, orig, hit, fref, stats_snapshot))
221            };
222
223            if let Some(result) = fast_result {
224                result
225            } else {
226                // Slow path: spawn thread with bounded timeout for contended locks.
227                let cache_lock = cache_lock.clone();
228                let mode = mode.clone();
229                let task_owned = current_task.clone();
230                let path_owned = path.to_string();
231                let cancel_flag = cancelled.clone();
232                let (tx, rx) = std::sync::mpsc::sync_channel(1);
233                std::thread::spawn(move || {
234                    let file_lock = per_file_lock(&path_owned);
235
236                    // Bounded per-file lock: if a zombie thread still holds it, don't
237                    // wait forever. 25s keeps us inside the 30s recv_timeout.
238                    let _file_guard = {
239                        let deadline =
240                            std::time::Instant::now() + std::time::Duration::from_secs(25);
241                        loop {
242                            if cancel_flag.load(Ordering::Relaxed) {
243                                return;
244                            }
245                            if let Ok(guard) = file_lock.try_lock() {
246                                break guard;
247                            }
248                            if std::time::Instant::now() >= deadline {
249                                tracing::error!(
250                                    "ctx_read: per-file lock timeout after 25s for {path_owned}"
251                                );
252                                return;
253                            }
254                            std::thread::sleep(std::time::Duration::from_millis(50));
255                        }
256                    };
257
258                    if cancel_flag.load(Ordering::Relaxed) {
259                        return;
260                    }
261
262                    // Bounded cache write-lock: avoids indefinite block when a zombie
263                    // thread from a previous timed-out call still holds the lock.
264                    let mut cache = {
265                        let deadline =
266                            std::time::Instant::now() + std::time::Duration::from_secs(25);
267                        loop {
268                            if cancel_flag.load(Ordering::Relaxed) {
269                                return;
270                            }
271                            if let Ok(guard) = cache_lock.try_write() {
272                                break guard;
273                            }
274                            if std::time::Instant::now() >= deadline {
275                                tracing::error!(
276                                    "ctx_read: cache write-lock timeout after 25s for {path_owned}"
277                                );
278                                return;
279                            }
280                            std::thread::sleep(std::time::Duration::from_millis(50));
281                        }
282                    };
283
284                    let task_ref = task_owned.as_deref();
285                    let read_output = if fresh {
286                        crate::tools::ctx_read::handle_fresh_with_task_resolved(
287                            &mut cache,
288                            &path_owned,
289                            &mode,
290                            crp_mode,
291                            task_ref,
292                        )
293                    } else {
294                        crate::tools::ctx_read::handle_with_task_resolved(
295                            &mut cache,
296                            &path_owned,
297                            &mode,
298                            crp_mode,
299                            task_ref,
300                        )
301                    };
302                    let content = read_output.content;
303                    let rmode = read_output.resolved_mode;
304                    let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
305                    let hit = content.contains(" cached ");
306                    let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
307                    let stats = cache.get_stats();
308                    let stats_snapshot = (stats.total_reads, stats.cache_hits);
309                    let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
310                });
311                if let Ok(result) = rx.recv_timeout(read_timeout) {
312                    result
313                } else {
314                    cancelled.store(true, Ordering::Relaxed);
315                    tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
316                    let msg = format!(
317                        "ERROR: ctx_read timed out after {}s reading {path}. \
318                     The file may be very large or a blocking I/O issue occurred. \
319                     Try mode=\"lines:1-100\" for a partial read.",
320                        read_timeout.as_secs()
321                    );
322                    return Err(ErrorData::internal_error(msg, None));
323                }
324            } // end else (slow path)
325        };
326
327        // Convert error results to proper MCP ErrorData instead of success body
328        if resolved_mode == "error" {
329            return Err(ErrorData::invalid_params(output, None));
330        }
331
332        let output_tokens = crate::core::tokens::count_tokens(&output);
333        let saved = original.saturating_sub(output_tokens);
334
335        // Session updates (short lock)
336        let mut ensured_root: Option<String> = None;
337        let project_root_snapshot;
338        {
339            let mut session = session_lock.blocking_write();
340            session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
341            if is_cache_hit {
342                session.record_cache_hit();
343            }
344            if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
345                let touched: Vec<String> = session
346                    .files_touched
347                    .iter()
348                    .map(|f| f.path.clone())
349                    .collect();
350                let inferred =
351                    crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
352                if inferred.confidence >= 0.4 {
353                    session.active_structured_intent = Some(inferred);
354                }
355            }
356            let root_missing = session
357                .project_root
358                .as_deref()
359                .is_none_or(|r| r.trim().is_empty());
360            if root_missing {
361                if let Some(root) = crate::core::protocol::detect_project_root(path) {
362                    session.project_root = Some(root.clone());
363                    ensured_root = Some(root);
364                }
365            }
366            project_root_snapshot = session
367                .project_root
368                .clone()
369                .unwrap_or_else(|| ".".to_string());
370        }
371
372        if let Some(root) = ensured_root.as_deref() {
373            crate::core::index_orchestrator::ensure_all_background(root);
374        }
375
376        crate::core::heatmap::record_file_access(path, original, saved);
377
378        // Mode predictor + feedback — no locks needed, uses snapshots from above
379        {
380            let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
381            let density = if output_tokens > 0 {
382                original as f64 / output_tokens as f64
383            } else {
384                1.0
385            };
386            let outcome = crate::core::mode_predictor::ModeOutcome {
387                mode: resolved_mode.clone(),
388                tokens_in: original,
389                tokens_out: output_tokens,
390                density: density.min(1.0),
391            };
392            let mut predictor = crate::core::mode_predictor::ModePredictor::new();
393            predictor.set_project_root(&project_root_snapshot);
394            predictor.record(sig, outcome);
395            predictor.save();
396
397            let ext = std::path::Path::new(path)
398                .extension()
399                .and_then(|e| e.to_str())
400                .unwrap_or("")
401                .to_string();
402            let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
403            let feedback_outcome = crate::core::feedback::CompressionOutcome {
404                session_id: format!("{}", std::process::id()),
405                language: ext,
406                entropy_threshold: thresholds.bpe_entropy,
407                jaccard_threshold: thresholds.jaccard,
408                total_turns: cache_stats.0 as u32,
409                tokens_saved: saved as u64,
410                tokens_original: original as u64,
411                cache_hits: cache_stats.1 as u32,
412                total_reads: cache_stats.0 as u32,
413                task_completed: true,
414                timestamp: chrono::Local::now().to_rfc3339(),
415            };
416            let mut store = crate::core::feedback::FeedbackStore::load();
417            store.project_root = Some(project_root_snapshot.clone());
418            store.record_outcome(feedback_outcome);
419        }
420
421        if let Some(aid) = resolved_agent_id.as_deref() {
422            crate::core::agent_budget::record_consumption(aid, output_tokens);
423        }
424
425        let final_output = if let Some(ref warning) = budget_warning {
426            format!("{output}\n\n{warning}")
427        } else {
428            output
429        };
430
431        Ok(ToolOutput {
432            text: final_output,
433            original_tokens: original,
434            saved_tokens: saved,
435            mode: Some(resolved_mode),
436            path: Some(path.to_string()),
437            changed: false,
438        })
439    }
440}
441
442fn auto_degrade_read_mode(mode: &str) -> String {
443    use crate::core::degradation_policy::DegradationVerdictV1;
444    let profile = crate::core::profiles::active_profile();
445    if !profile.degradation.enforce_effective() {
446        return mode.to_string();
447    }
448    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
449    match policy.decision.verdict {
450        DegradationVerdictV1::Ok => mode.to_string(),
451        DegradationVerdictV1::Warn => match mode {
452            "full" => "map".to_string(),
453            other => other.to_string(),
454        },
455        DegradationVerdictV1::Throttle => match mode {
456            "full" | "map" => "signatures".to_string(),
457            other => other.to_string(),
458        },
459        DegradationVerdictV1::Block => "signatures".to_string(),
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use std::sync::atomic::{AtomicUsize, Ordering};
467
468    #[test]
469    fn per_file_lock_same_path_returns_same_mutex() {
470        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
471        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
472        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
473    }
474
475    #[test]
476    fn per_file_lock_different_paths_return_different_mutexes() {
477        let lock_a = per_file_lock("/tmp/test_path_a.txt");
478        let lock_b = per_file_lock("/tmp/test_path_b.txt");
479        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
480    }
481
482    #[test]
483    fn per_file_lock_serializes_concurrent_access() {
484        let counter = Arc::new(AtomicUsize::new(0));
485        let max_concurrent = Arc::new(AtomicUsize::new(0));
486        let path = "/tmp/test_concurrent_serialization.txt";
487        let mut handles = Vec::new();
488
489        for _ in 0..5 {
490            let counter = counter.clone();
491            let max_concurrent = max_concurrent.clone();
492            let path = path.to_string();
493            handles.push(std::thread::spawn(move || {
494                let lock = per_file_lock(&path);
495                let _guard = lock.lock().unwrap();
496                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
497                max_concurrent.fetch_max(active, Ordering::SeqCst);
498                std::thread::sleep(std::time::Duration::from_millis(10));
499                counter.fetch_sub(1, Ordering::SeqCst);
500            }));
501        }
502
503        for h in handles {
504            h.join().unwrap();
505        }
506
507        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
508    }
509
510    #[test]
511    fn per_file_lock_allows_parallel_different_paths() {
512        let counter = Arc::new(AtomicUsize::new(0));
513        let max_concurrent = Arc::new(AtomicUsize::new(0));
514        let mut handles = Vec::new();
515
516        for i in 0..4 {
517            let counter = counter.clone();
518            let max_concurrent = max_concurrent.clone();
519            let path = format!("/tmp/test_parallel_{i}.txt");
520            handles.push(std::thread::spawn(move || {
521                let lock = per_file_lock(&path);
522                let _guard = lock.lock().unwrap();
523                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
524                max_concurrent.fetch_max(active, Ordering::SeqCst);
525                std::thread::sleep(std::time::Duration::from_millis(50));
526                counter.fetch_sub(1, Ordering::SeqCst);
527            }));
528        }
529
530        for h in handles {
531            h.join().unwrap();
532        }
533
534        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
535    }
536
537    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
538    /// must not block subsequent reads indefinitely. The try_write() loop inside
539    /// the spawned thread should respect its 25s deadline and the cancellation flag.
540    #[test]
541    fn zombie_thread_does_not_block_subsequent_cache_access() {
542        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
543
544        // Simulate a zombie: hold the write-lock on a background thread for 2s.
545        let zombie_lock = cache.clone();
546        let _zombie = std::thread::spawn(move || {
547            let _guard = zombie_lock.blocking_write();
548            std::thread::sleep(std::time::Duration::from_secs(2));
549        });
550        std::thread::sleep(std::time::Duration::from_millis(50));
551
552        // A try_read() must fail immediately (zombie holds write-lock).
553        assert!(cache.try_read().is_err());
554
555        // A try_write() loop with cancellation must exit promptly.
556        let cancel = Arc::new(AtomicBool::new(false));
557        let cancel2 = cancel.clone();
558        let lock2 = cache.clone();
559        let waiter = std::thread::spawn(move || {
560            let start = std::time::Instant::now();
561            loop {
562                if cancel2.load(Ordering::Relaxed) {
563                    return (false, start.elapsed());
564                }
565                if let Ok(_guard) = lock2.try_write() {
566                    return (true, start.elapsed());
567                }
568                std::thread::sleep(std::time::Duration::from_millis(50));
569            }
570        });
571
572        // Set cancellation after 200ms — the loop should exit quickly.
573        std::thread::sleep(std::time::Duration::from_millis(200));
574        cancel.store(true, Ordering::Relaxed);
575
576        let (acquired, elapsed) = waiter.join().unwrap();
577        assert!(
578            !acquired,
579            "should not have acquired lock while zombie holds it"
580        );
581        assert!(
582            elapsed < std::time::Duration::from_secs(1),
583            "cancellation should have stopped the loop promptly"
584        );
585    }
586}