Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{get_bool, get_int, get_str, McpTool, ToolContext, ToolOutput};
10use crate::tool_defs::tool_def;
11
12/// Per-file lock that serializes concurrent reads of the same path.
13///
14/// When multiple subagents read sequentially through a shared set of files,
15/// they tend to hit the same path at the same time. Without per-file locking
16/// they all contend on the global cache write lock while doing redundant I/O.
17/// This lock ensures only one thread reads a given file from disk; the others
18/// wait cheaply on the per-file mutex, then hit the warm cache.
19fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
20    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
21        std::sync::OnceLock::new();
22    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
23    let mut map = map.lock().unwrap();
24    map.entry(path.to_string())
25        .or_insert_with(|| Arc::new(Mutex::new(())))
26        .clone()
27}
28
29pub struct CtxReadTool;
30
31impl McpTool for CtxReadTool {
32    fn name(&self) -> &'static str {
33        "ctx_read"
34    }
35
36    fn tool_def(&self) -> Tool {
37        tool_def(
38            "ctx_read",
39            "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
40Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
41            json!({
42                "type": "object",
43                "properties": {
44                    "path": { "type": "string", "description": "Absolute file path to read" },
45                    "mode": {
46                        "type": "string",
47                        "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
48                    },
49                    "start_line": {
50                        "type": "integer",
51                        "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
52                    },
53                    "fresh": {
54                        "type": "boolean",
55                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
56                    }
57                },
58                "required": ["path"]
59            }),
60        )
61    }
62
63    fn handle(
64        &self,
65        args: &Map<String, Value>,
66        ctx: &ToolContext,
67    ) -> Result<ToolOutput, ErrorData> {
68        let path = ctx
69            .resolved_path("path")
70            .ok_or_else(|| ErrorData::invalid_params("path is required", None))?
71            .to_string();
72
73        self.handle_inner(args, ctx, &path)
74    }
75}
76
77impl CtxReadTool {
78    #[allow(clippy::unused_self)]
79    fn handle_inner(
80        &self,
81        args: &Map<String, Value>,
82        ctx: &ToolContext,
83        path: &str,
84    ) -> Result<ToolOutput, ErrorData> {
85        let session_lock = ctx
86            .session
87            .as_ref()
88            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
89        let cache_lock = ctx
90            .cache
91            .as_ref()
92            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
93
94        let current_task = {
95            let session = session_lock.blocking_read();
96            session.task.as_ref().map(|t| t.description.clone())
97        };
98        let task_ref = current_task.as_deref();
99
100        let profile = crate::core::profiles::active_profile();
101        let mut mode = if let Some(m) = get_str(args, "mode") {
102            m
103        } else if profile.read.default_mode_effective() == "auto" {
104            // Non-blocking: if the cache write-lock is held by a timed-out zombie
105            // thread, blocking_read() would hang with NO timeout protection.
106            if let Ok(cache) = cache_lock.try_read() {
107                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
108            } else {
109                tracing::debug!(
110                    "cache lock contested during auto-mode selection for {path}; \
111                     falling back to full"
112                );
113                "full".to_string()
114            }
115        } else {
116            profile.read.default_mode_effective().to_string()
117        };
118
119        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
120        let start_line = get_int(args, "start_line");
121        if let Some(sl) = start_line {
122            let sl = sl.max(1_i64);
123            mode = format!("lines:{sl}-999999");
124            fresh = true;
125        }
126
127        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
128        let gate_result = crate::server::context_gate::pre_dispatch_read(
129            path,
130            &mode,
131            task_ref,
132            Some(&ctx.project_root),
133            pressure_action,
134        );
135        if let Some(overridden) = gate_result.overridden_mode {
136            mode = overridden;
137        }
138
139        let mode = if crate::tools::ctx_read::is_instruction_file(path) {
140            "full".to_string()
141        } else {
142            auto_degrade_read_mode(&mode)
143        };
144
145        if mode.starts_with("lines:") {
146            fresh = true;
147        }
148
149        if crate::core::binary_detect::is_binary_file(path) {
150            let msg = crate::core::binary_detect::binary_file_message(path);
151            return Err(ErrorData::invalid_params(msg, None));
152        }
153        {
154            let cap = crate::core::limits::max_read_bytes() as u64;
155            if let Ok(meta) = std::fs::metadata(path) {
156                if meta.len() > cap {
157                    let msg = format!(
158                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
159                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
160                        meta.len(),
161                        cap
162                    );
163                    return Err(ErrorData::invalid_params(msg, None));
164                }
165            }
166        }
167
168        // Acquire cache write lock with timeout guard to prevent infinite hangs.
169        // All lock acquisitions (per-file mutex, cache RwLock) use bounded waits
170        // so a zombie thread from a previous timed-out call cannot block us forever.
171        let read_timeout = std::time::Duration::from_secs(30);
172        let cancelled = Arc::new(AtomicBool::new(false));
173        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
174            let cache_lock = cache_lock.clone();
175            let mode = mode.clone();
176            let crp_mode = ctx.crp_mode;
177            let task_owned = current_task.clone();
178            let path_owned = path.to_string();
179            let cancel_flag = cancelled.clone();
180            let (tx, rx) = std::sync::mpsc::sync_channel(1);
181            std::thread::spawn(move || {
182                let file_lock = per_file_lock(&path_owned);
183
184                // Bounded per-file lock: if a zombie thread still holds it, don't
185                // wait forever. 25s keeps us inside the 30s recv_timeout.
186                let _file_guard = {
187                    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
188                    loop {
189                        if cancel_flag.load(Ordering::Relaxed) {
190                            return;
191                        }
192                        if let Ok(guard) = file_lock.try_lock() {
193                            break guard;
194                        }
195                        if std::time::Instant::now() >= deadline {
196                            tracing::error!(
197                                "ctx_read: per-file lock timeout after 25s for {path_owned}"
198                            );
199                            return;
200                        }
201                        std::thread::sleep(std::time::Duration::from_millis(50));
202                    }
203                };
204
205                if cancel_flag.load(Ordering::Relaxed) {
206                    return;
207                }
208
209                // Bounded cache write-lock: avoids indefinite block when a zombie
210                // thread from a previous timed-out call still holds the lock.
211                let mut cache = {
212                    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
213                    loop {
214                        if cancel_flag.load(Ordering::Relaxed) {
215                            return;
216                        }
217                        if let Ok(guard) = cache_lock.try_write() {
218                            break guard;
219                        }
220                        if std::time::Instant::now() >= deadline {
221                            tracing::error!(
222                                "ctx_read: cache write-lock timeout after 25s for {path_owned}"
223                            );
224                            return;
225                        }
226                        std::thread::sleep(std::time::Duration::from_millis(50));
227                    }
228                };
229
230                let task_ref = task_owned.as_deref();
231                let read_output = if fresh {
232                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
233                        &mut cache,
234                        &path_owned,
235                        &mode,
236                        crp_mode,
237                        task_ref,
238                    )
239                } else {
240                    crate::tools::ctx_read::handle_with_task_resolved(
241                        &mut cache,
242                        &path_owned,
243                        &mode,
244                        crp_mode,
245                        task_ref,
246                    )
247                };
248                let content = read_output.content;
249                let rmode = read_output.resolved_mode;
250                let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
251                let hit = content.contains(" cached ");
252                let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
253                let stats = cache.get_stats();
254                let stats_snapshot = (stats.total_reads, stats.cache_hits);
255                let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
256            });
257            if let Ok(result) = rx.recv_timeout(read_timeout) {
258                result
259            } else {
260                cancelled.store(true, Ordering::Relaxed);
261                tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
262                let msg = format!(
263                    "ERROR: ctx_read timed out after {}s reading {path}. \
264                     The file may be very large or a blocking I/O issue occurred. \
265                     Try mode=\"lines:1-100\" for a partial read.",
266                    read_timeout.as_secs()
267                );
268                return Err(ErrorData::internal_error(msg, None));
269            }
270        };
271
272        let output_tokens = crate::core::tokens::count_tokens(&output);
273        let saved = original.saturating_sub(output_tokens);
274
275        // Session updates (short lock)
276        let mut ensured_root: Option<String> = None;
277        let project_root_snapshot;
278        {
279            let mut session = session_lock.blocking_write();
280            session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
281            if is_cache_hit {
282                session.record_cache_hit();
283            }
284            if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
285                let touched: Vec<String> = session
286                    .files_touched
287                    .iter()
288                    .map(|f| f.path.clone())
289                    .collect();
290                let inferred =
291                    crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
292                if inferred.confidence >= 0.4 {
293                    session.active_structured_intent = Some(inferred);
294                }
295            }
296            let root_missing = session
297                .project_root
298                .as_deref()
299                .is_none_or(|r| r.trim().is_empty());
300            if root_missing {
301                if let Some(root) = crate::core::protocol::detect_project_root(path) {
302                    session.project_root = Some(root.clone());
303                    ensured_root = Some(root);
304                }
305            }
306            project_root_snapshot = session
307                .project_root
308                .clone()
309                .unwrap_or_else(|| ".".to_string());
310        }
311
312        if let Some(root) = ensured_root.as_deref() {
313            crate::core::index_orchestrator::ensure_all_background(root);
314        }
315
316        crate::core::heatmap::record_file_access(path, original, saved);
317
318        // Mode predictor + feedback — no locks needed, uses snapshots from above
319        {
320            let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
321            let density = if output_tokens > 0 {
322                original as f64 / output_tokens as f64
323            } else {
324                1.0
325            };
326            let outcome = crate::core::mode_predictor::ModeOutcome {
327                mode: resolved_mode.clone(),
328                tokens_in: original,
329                tokens_out: output_tokens,
330                density: density.min(1.0),
331            };
332            let mut predictor = crate::core::mode_predictor::ModePredictor::new();
333            predictor.set_project_root(&project_root_snapshot);
334            predictor.record(sig, outcome);
335            predictor.save();
336
337            let ext = std::path::Path::new(path)
338                .extension()
339                .and_then(|e| e.to_str())
340                .unwrap_or("")
341                .to_string();
342            let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
343            let feedback_outcome = crate::core::feedback::CompressionOutcome {
344                session_id: format!("{}", std::process::id()),
345                language: ext,
346                entropy_threshold: thresholds.bpe_entropy,
347                jaccard_threshold: thresholds.jaccard,
348                total_turns: cache_stats.0 as u32,
349                tokens_saved: saved as u64,
350                tokens_original: original as u64,
351                cache_hits: cache_stats.1 as u32,
352                total_reads: cache_stats.0 as u32,
353                task_completed: true,
354                timestamp: chrono::Local::now().to_rfc3339(),
355            };
356            let mut store = crate::core::feedback::FeedbackStore::load();
357            store.project_root = Some(project_root_snapshot.clone());
358            store.record_outcome(feedback_outcome);
359        }
360
361        // NOTE: pipeline_stats, context_ir, and ledger updates are handled by the
362        // dispatch layer's record_call flow. Agent registration requires server.agent_id
363        // which is not available in ToolContext; it will be added when ToolContext is
364        // extended with the remaining server state fields.
365
366        Ok(ToolOutput {
367            text: output,
368            original_tokens: original,
369            saved_tokens: saved,
370            mode: Some(resolved_mode),
371            path: Some(path.to_string()),
372            changed: false,
373        })
374    }
375}
376
377fn auto_degrade_read_mode(mode: &str) -> String {
378    use crate::core::degradation_policy::DegradationVerdictV1;
379    let profile = crate::core::profiles::active_profile();
380    if !profile.degradation.enforce_effective() {
381        return mode.to_string();
382    }
383    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
384    match policy.decision.verdict {
385        DegradationVerdictV1::Ok => mode.to_string(),
386        DegradationVerdictV1::Warn => match mode {
387            "full" => "map".to_string(),
388            other => other.to_string(),
389        },
390        DegradationVerdictV1::Throttle => match mode {
391            "full" | "map" => "signatures".to_string(),
392            other => other.to_string(),
393        },
394        DegradationVerdictV1::Block => "signatures".to_string(),
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::sync::atomic::{AtomicUsize, Ordering};
402
403    #[test]
404    fn per_file_lock_same_path_returns_same_mutex() {
405        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
406        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
407        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
408    }
409
410    #[test]
411    fn per_file_lock_different_paths_return_different_mutexes() {
412        let lock_a = per_file_lock("/tmp/test_path_a.txt");
413        let lock_b = per_file_lock("/tmp/test_path_b.txt");
414        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
415    }
416
417    #[test]
418    fn per_file_lock_serializes_concurrent_access() {
419        let counter = Arc::new(AtomicUsize::new(0));
420        let max_concurrent = Arc::new(AtomicUsize::new(0));
421        let path = "/tmp/test_concurrent_serialization.txt";
422        let mut handles = Vec::new();
423
424        for _ in 0..5 {
425            let counter = counter.clone();
426            let max_concurrent = max_concurrent.clone();
427            let path = path.to_string();
428            handles.push(std::thread::spawn(move || {
429                let lock = per_file_lock(&path);
430                let _guard = lock.lock().unwrap();
431                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
432                max_concurrent.fetch_max(active, Ordering::SeqCst);
433                std::thread::sleep(std::time::Duration::from_millis(10));
434                counter.fetch_sub(1, Ordering::SeqCst);
435            }));
436        }
437
438        for h in handles {
439            h.join().unwrap();
440        }
441
442        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
443    }
444
445    #[test]
446    fn per_file_lock_allows_parallel_different_paths() {
447        let counter = Arc::new(AtomicUsize::new(0));
448        let max_concurrent = Arc::new(AtomicUsize::new(0));
449        let mut handles = Vec::new();
450
451        for i in 0..4 {
452            let counter = counter.clone();
453            let max_concurrent = max_concurrent.clone();
454            let path = format!("/tmp/test_parallel_{i}.txt");
455            handles.push(std::thread::spawn(move || {
456                let lock = per_file_lock(&path);
457                let _guard = lock.lock().unwrap();
458                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
459                max_concurrent.fetch_max(active, Ordering::SeqCst);
460                std::thread::sleep(std::time::Duration::from_millis(50));
461                counter.fetch_sub(1, Ordering::SeqCst);
462            }));
463        }
464
465        for h in handles {
466            h.join().unwrap();
467        }
468
469        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
470    }
471
472    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
473    /// must not block subsequent reads indefinitely. The try_write() loop inside
474    /// the spawned thread should respect its 25s deadline and the cancellation flag.
475    #[test]
476    fn zombie_thread_does_not_block_subsequent_cache_access() {
477        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
478
479        // Simulate a zombie: hold the write-lock on a background thread for 2s.
480        let zombie_lock = cache.clone();
481        let _zombie = std::thread::spawn(move || {
482            let _guard = zombie_lock.blocking_write();
483            std::thread::sleep(std::time::Duration::from_secs(2));
484        });
485        std::thread::sleep(std::time::Duration::from_millis(50));
486
487        // A try_read() must fail immediately (zombie holds write-lock).
488        assert!(cache.try_read().is_err());
489
490        // A try_write() loop with cancellation must exit promptly.
491        let cancel = Arc::new(AtomicBool::new(false));
492        let cancel2 = cancel.clone();
493        let lock2 = cache.clone();
494        let waiter = std::thread::spawn(move || {
495            let start = std::time::Instant::now();
496            loop {
497                if cancel2.load(Ordering::Relaxed) {
498                    return (false, start.elapsed());
499                }
500                if let Ok(_guard) = lock2.try_write() {
501                    return (true, start.elapsed());
502                }
503                std::thread::sleep(std::time::Duration::from_millis(50));
504            }
505        });
506
507        // Set cancellation after 200ms — the loop should exit quickly.
508        std::thread::sleep(std::time::Duration::from_millis(200));
509        cancel.store(true, Ordering::Relaxed);
510
511        let (acquired, elapsed) = waiter.join().unwrap();
512        assert!(
513            !acquired,
514            "should not have acquired lock while zombie holds it"
515        );
516        assert!(
517            elapsed < std::time::Duration::from_secs(1),
518            "cancellation should have stopped the loop promptly"
519        );
520    }
521}