lean_ctx/tools/registered/
ctx_read.rs1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{get_bool, get_int, get_str, McpTool, ToolContext, ToolOutput};
10use crate::tool_defs::tool_def;
11
12fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
20 static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
21 std::sync::OnceLock::new();
22 let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
23 let mut map = map.lock().unwrap();
24 map.entry(path.to_string())
25 .or_insert_with(|| Arc::new(Mutex::new(())))
26 .clone()
27}
28
29pub struct CtxReadTool;
30
31impl McpTool for CtxReadTool {
32 fn name(&self) -> &'static str {
33 "ctx_read"
34 }
35
36 fn tool_def(&self) -> Tool {
37 tool_def(
38 "ctx_read",
39 "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
40Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
41 json!({
42 "type": "object",
43 "properties": {
44 "path": { "type": "string", "description": "Absolute file path to read" },
45 "mode": {
46 "type": "string",
47 "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
48 },
49 "start_line": {
50 "type": "integer",
51 "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
52 },
53 "fresh": {
54 "type": "boolean",
55 "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
56 }
57 },
58 "required": ["path"]
59 }),
60 )
61 }
62
63 fn handle(
64 &self,
65 args: &Map<String, Value>,
66 ctx: &ToolContext,
67 ) -> Result<ToolOutput, ErrorData> {
68 let path = ctx
69 .resolved_path("path")
70 .ok_or_else(|| ErrorData::invalid_params("path is required", None))?
71 .to_string();
72
73 self.handle_inner(args, ctx, &path)
74 }
75}
76
77impl CtxReadTool {
78 #[allow(clippy::unused_self)]
79 fn handle_inner(
80 &self,
81 args: &Map<String, Value>,
82 ctx: &ToolContext,
83 path: &str,
84 ) -> Result<ToolOutput, ErrorData> {
85 let session_lock = ctx
86 .session
87 .as_ref()
88 .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
89 let cache_lock = ctx
90 .cache
91 .as_ref()
92 .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
93
94 let current_task = {
95 let session = session_lock.blocking_read();
96 session.task.as_ref().map(|t| t.description.clone())
97 };
98 let task_ref = current_task.as_deref();
99
100 let profile = crate::core::profiles::active_profile();
101 let mut mode = if let Some(m) = get_str(args, "mode") {
102 m
103 } else if profile.read.default_mode_effective() == "auto" {
104 if let Ok(cache) = cache_lock.try_read() {
107 crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
108 } else {
109 tracing::debug!(
110 "cache lock contested during auto-mode selection for {path}; \
111 falling back to full"
112 );
113 "full".to_string()
114 }
115 } else {
116 profile.read.default_mode_effective().to_string()
117 };
118
119 let mut fresh = get_bool(args, "fresh").unwrap_or(false);
120 let start_line = get_int(args, "start_line");
121 if let Some(sl) = start_line {
122 let sl = sl.max(1_i64);
123 mode = format!("lines:{sl}-999999");
124 fresh = true;
125 }
126
127 let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
128 let gate_result = crate::server::context_gate::pre_dispatch_read(
129 path,
130 &mode,
131 task_ref,
132 Some(&ctx.project_root),
133 pressure_action,
134 );
135 if let Some(overridden) = gate_result.overridden_mode {
136 mode = overridden;
137 }
138
139 let mode = if crate::tools::ctx_read::is_instruction_file(path) {
140 "full".to_string()
141 } else {
142 auto_degrade_read_mode(&mode)
143 };
144
145 if mode.starts_with("lines:") {
146 fresh = true;
147 }
148
149 if crate::core::binary_detect::is_binary_file(path) {
150 let msg = crate::core::binary_detect::binary_file_message(path);
151 return Err(ErrorData::invalid_params(msg, None));
152 }
153 {
154 let cap = crate::core::limits::max_read_bytes() as u64;
155 if let Ok(meta) = std::fs::metadata(path) {
156 if meta.len() > cap {
157 let msg = format!(
158 "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
159 Use mode=\"lines:1-100\" for partial reads or increase the limit.",
160 meta.len(),
161 cap
162 );
163 return Err(ErrorData::invalid_params(msg, None));
164 }
165 }
166 }
167
168 let read_timeout = std::time::Duration::from_secs(30);
172 let cancelled = Arc::new(AtomicBool::new(false));
173 let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
174 let cache_lock = cache_lock.clone();
175 let mode = mode.clone();
176 let crp_mode = ctx.crp_mode;
177 let task_owned = current_task.clone();
178 let path_owned = path.to_string();
179 let cancel_flag = cancelled.clone();
180 let (tx, rx) = std::sync::mpsc::sync_channel(1);
181 std::thread::spawn(move || {
182 let file_lock = per_file_lock(&path_owned);
183
184 let _file_guard = {
187 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
188 loop {
189 if cancel_flag.load(Ordering::Relaxed) {
190 return;
191 }
192 if let Ok(guard) = file_lock.try_lock() {
193 break guard;
194 }
195 if std::time::Instant::now() >= deadline {
196 tracing::error!(
197 "ctx_read: per-file lock timeout after 25s for {path_owned}"
198 );
199 return;
200 }
201 std::thread::sleep(std::time::Duration::from_millis(50));
202 }
203 };
204
205 if cancel_flag.load(Ordering::Relaxed) {
206 return;
207 }
208
209 let mut cache = {
212 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
213 loop {
214 if cancel_flag.load(Ordering::Relaxed) {
215 return;
216 }
217 if let Ok(guard) = cache_lock.try_write() {
218 break guard;
219 }
220 if std::time::Instant::now() >= deadline {
221 tracing::error!(
222 "ctx_read: cache write-lock timeout after 25s for {path_owned}"
223 );
224 return;
225 }
226 std::thread::sleep(std::time::Duration::from_millis(50));
227 }
228 };
229
230 let task_ref = task_owned.as_deref();
231 let read_output = if fresh {
232 crate::tools::ctx_read::handle_fresh_with_task_resolved(
233 &mut cache,
234 &path_owned,
235 &mode,
236 crp_mode,
237 task_ref,
238 )
239 } else {
240 crate::tools::ctx_read::handle_with_task_resolved(
241 &mut cache,
242 &path_owned,
243 &mode,
244 crp_mode,
245 task_ref,
246 )
247 };
248 let content = read_output.content;
249 let rmode = read_output.resolved_mode;
250 let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
251 let hit = content.contains(" cached ");
252 let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
253 let stats = cache.get_stats();
254 let stats_snapshot = (stats.total_reads, stats.cache_hits);
255 let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
256 });
257 if let Ok(result) = rx.recv_timeout(read_timeout) {
258 result
259 } else {
260 cancelled.store(true, Ordering::Relaxed);
261 tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
262 let msg = format!(
263 "ERROR: ctx_read timed out after {}s reading {path}. \
264 The file may be very large or a blocking I/O issue occurred. \
265 Try mode=\"lines:1-100\" for a partial read.",
266 read_timeout.as_secs()
267 );
268 return Err(ErrorData::internal_error(msg, None));
269 }
270 };
271
272 let output_tokens = crate::core::tokens::count_tokens(&output);
273 let saved = original.saturating_sub(output_tokens);
274
275 let mut ensured_root: Option<String> = None;
277 let project_root_snapshot;
278 {
279 let mut session = session_lock.blocking_write();
280 session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
281 if is_cache_hit {
282 session.record_cache_hit();
283 }
284 if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
285 let touched: Vec<String> = session
286 .files_touched
287 .iter()
288 .map(|f| f.path.clone())
289 .collect();
290 let inferred =
291 crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
292 if inferred.confidence >= 0.4 {
293 session.active_structured_intent = Some(inferred);
294 }
295 }
296 let root_missing = session
297 .project_root
298 .as_deref()
299 .is_none_or(|r| r.trim().is_empty());
300 if root_missing {
301 if let Some(root) = crate::core::protocol::detect_project_root(path) {
302 session.project_root = Some(root.clone());
303 ensured_root = Some(root);
304 }
305 }
306 project_root_snapshot = session
307 .project_root
308 .clone()
309 .unwrap_or_else(|| ".".to_string());
310 }
311
312 if let Some(root) = ensured_root.as_deref() {
313 crate::core::index_orchestrator::ensure_all_background(root);
314 }
315
316 crate::core::heatmap::record_file_access(path, original, saved);
317
318 {
320 let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
321 let density = if output_tokens > 0 {
322 original as f64 / output_tokens as f64
323 } else {
324 1.0
325 };
326 let outcome = crate::core::mode_predictor::ModeOutcome {
327 mode: resolved_mode.clone(),
328 tokens_in: original,
329 tokens_out: output_tokens,
330 density: density.min(1.0),
331 };
332 let mut predictor = crate::core::mode_predictor::ModePredictor::new();
333 predictor.set_project_root(&project_root_snapshot);
334 predictor.record(sig, outcome);
335 predictor.save();
336
337 let ext = std::path::Path::new(path)
338 .extension()
339 .and_then(|e| e.to_str())
340 .unwrap_or("")
341 .to_string();
342 let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
343 let feedback_outcome = crate::core::feedback::CompressionOutcome {
344 session_id: format!("{}", std::process::id()),
345 language: ext,
346 entropy_threshold: thresholds.bpe_entropy,
347 jaccard_threshold: thresholds.jaccard,
348 total_turns: cache_stats.0 as u32,
349 tokens_saved: saved as u64,
350 tokens_original: original as u64,
351 cache_hits: cache_stats.1 as u32,
352 total_reads: cache_stats.0 as u32,
353 task_completed: true,
354 timestamp: chrono::Local::now().to_rfc3339(),
355 };
356 let mut store = crate::core::feedback::FeedbackStore::load();
357 store.project_root = Some(project_root_snapshot.clone());
358 store.record_outcome(feedback_outcome);
359 }
360
361 Ok(ToolOutput {
362 text: output,
363 original_tokens: original,
364 saved_tokens: saved,
365 mode: Some(resolved_mode),
366 path: Some(path.to_string()),
367 changed: false,
368 })
369 }
370}
371
372fn auto_degrade_read_mode(mode: &str) -> String {
373 use crate::core::degradation_policy::DegradationVerdictV1;
374 let profile = crate::core::profiles::active_profile();
375 if !profile.degradation.enforce_effective() {
376 return mode.to_string();
377 }
378 let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
379 match policy.decision.verdict {
380 DegradationVerdictV1::Ok => mode.to_string(),
381 DegradationVerdictV1::Warn => match mode {
382 "full" => "map".to_string(),
383 other => other.to_string(),
384 },
385 DegradationVerdictV1::Throttle => match mode {
386 "full" | "map" => "signatures".to_string(),
387 other => other.to_string(),
388 },
389 DegradationVerdictV1::Block => "signatures".to_string(),
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use std::sync::atomic::{AtomicUsize, Ordering};
397
398 #[test]
399 fn per_file_lock_same_path_returns_same_mutex() {
400 let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
401 let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
402 assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
403 }
404
405 #[test]
406 fn per_file_lock_different_paths_return_different_mutexes() {
407 let lock_a = per_file_lock("/tmp/test_path_a.txt");
408 let lock_b = per_file_lock("/tmp/test_path_b.txt");
409 assert!(!Arc::ptr_eq(&lock_a, &lock_b));
410 }
411
412 #[test]
413 fn per_file_lock_serializes_concurrent_access() {
414 let counter = Arc::new(AtomicUsize::new(0));
415 let max_concurrent = Arc::new(AtomicUsize::new(0));
416 let path = "/tmp/test_concurrent_serialization.txt";
417 let mut handles = Vec::new();
418
419 for _ in 0..5 {
420 let counter = counter.clone();
421 let max_concurrent = max_concurrent.clone();
422 let path = path.to_string();
423 handles.push(std::thread::spawn(move || {
424 let lock = per_file_lock(&path);
425 let _guard = lock.lock().unwrap();
426 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
427 max_concurrent.fetch_max(active, Ordering::SeqCst);
428 std::thread::sleep(std::time::Duration::from_millis(10));
429 counter.fetch_sub(1, Ordering::SeqCst);
430 }));
431 }
432
433 for h in handles {
434 h.join().unwrap();
435 }
436
437 assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
438 }
439
440 #[test]
441 fn per_file_lock_allows_parallel_different_paths() {
442 let counter = Arc::new(AtomicUsize::new(0));
443 let max_concurrent = Arc::new(AtomicUsize::new(0));
444 let mut handles = Vec::new();
445
446 for i in 0..4 {
447 let counter = counter.clone();
448 let max_concurrent = max_concurrent.clone();
449 let path = format!("/tmp/test_parallel_{i}.txt");
450 handles.push(std::thread::spawn(move || {
451 let lock = per_file_lock(&path);
452 let _guard = lock.lock().unwrap();
453 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
454 max_concurrent.fetch_max(active, Ordering::SeqCst);
455 std::thread::sleep(std::time::Duration::from_millis(50));
456 counter.fetch_sub(1, Ordering::SeqCst);
457 }));
458 }
459
460 for h in handles {
461 h.join().unwrap();
462 }
463
464 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
465 }
466
467 #[test]
471 fn zombie_thread_does_not_block_subsequent_cache_access() {
472 let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
473
474 let zombie_lock = cache.clone();
476 let _zombie = std::thread::spawn(move || {
477 let _guard = zombie_lock.blocking_write();
478 std::thread::sleep(std::time::Duration::from_secs(2));
479 });
480 std::thread::sleep(std::time::Duration::from_millis(50));
481
482 assert!(cache.try_read().is_err());
484
485 let cancel = Arc::new(AtomicBool::new(false));
487 let cancel2 = cancel.clone();
488 let lock2 = cache.clone();
489 let waiter = std::thread::spawn(move || {
490 let start = std::time::Instant::now();
491 loop {
492 if cancel2.load(Ordering::Relaxed) {
493 return (false, start.elapsed());
494 }
495 if let Ok(_guard) = lock2.try_write() {
496 return (true, start.elapsed());
497 }
498 std::thread::sleep(std::time::Duration::from_millis(50));
499 }
500 });
501
502 std::thread::sleep(std::time::Duration::from_millis(200));
504 cancel.store(true, Ordering::Relaxed);
505
506 let (acquired, elapsed) = waiter.join().unwrap();
507 assert!(
508 !acquired,
509 "should not have acquired lock while zombie holds it"
510 );
511 assert!(
512 elapsed < std::time::Duration::from_secs(1),
513 "cancellation should have stopped the loop promptly"
514 );
515 }
516}