lean_ctx/tools/registered/
ctx_read.rs1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{get_bool, get_int, get_str, McpTool, ToolContext, ToolOutput};
10use crate::tool_defs::tool_def;
11
12fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
20 static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
21 std::sync::OnceLock::new();
22 let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
23 let mut map = map.lock().unwrap();
24
25 const MAX_ENTRIES: usize = 500;
26 if map.len() > MAX_ENTRIES {
27 map.retain(|_, v| Arc::strong_count(v) > 1);
28 }
29
30 map.entry(path.to_string())
31 .or_insert_with(|| Arc::new(Mutex::new(())))
32 .clone()
33}
34
35pub struct CtxReadTool;
36
37impl McpTool for CtxReadTool {
38 fn name(&self) -> &'static str {
39 "ctx_read"
40 }
41
42 fn tool_def(&self) -> Tool {
43 tool_def(
44 "ctx_read",
45 "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
46Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
47 json!({
48 "type": "object",
49 "properties": {
50 "path": { "type": "string", "description": "Absolute file path to read" },
51 "mode": {
52 "type": "string",
53 "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
54 },
55 "start_line": {
56 "type": "integer",
57 "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
58 },
59 "fresh": {
60 "type": "boolean",
61 "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
62 }
63 },
64 "required": ["path"]
65 }),
66 )
67 }
68
69 fn handle(
70 &self,
71 args: &Map<String, Value>,
72 ctx: &ToolContext,
73 ) -> Result<ToolOutput, ErrorData> {
74 let path = ctx
75 .resolved_path("path")
76 .ok_or_else(|| ErrorData::invalid_params("path is required", None))?
77 .to_string();
78
79 self.handle_inner(args, ctx, &path)
80 }
81}
82
83impl CtxReadTool {
84 #[allow(clippy::unused_self)]
85 fn handle_inner(
86 &self,
87 args: &Map<String, Value>,
88 ctx: &ToolContext,
89 path: &str,
90 ) -> Result<ToolOutput, ErrorData> {
91 let session_lock = ctx
92 .session
93 .as_ref()
94 .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
95 let cache_lock = ctx
96 .cache
97 .as_ref()
98 .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
99
100 let current_task = {
101 let session = session_lock.blocking_read();
102 session.task.as_ref().map(|t| t.description.clone())
103 };
104 let task_ref = current_task.as_deref();
105
106 let profile = crate::core::profiles::active_profile();
107 let mut mode = if let Some(m) = get_str(args, "mode") {
108 m
109 } else if profile.read.default_mode_effective() == "auto" {
110 if let Ok(cache) = cache_lock.try_read() {
113 crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
114 } else {
115 tracing::debug!(
116 "cache lock contested during auto-mode selection for {path}; \
117 falling back to full"
118 );
119 "full".to_string()
120 }
121 } else {
122 profile.read.default_mode_effective().to_string()
123 };
124
125 let mut fresh = get_bool(args, "fresh").unwrap_or(false);
126 let start_line = get_int(args, "start_line");
127 if let Some(sl) = start_line {
128 let sl = sl.max(1_i64);
129 mode = format!("lines:{sl}-999999");
130 fresh = true;
131 }
132
133 let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
134 let resolved_agent_id = ctx
135 .agent_id
136 .as_ref()
137 .and_then(|a| a.blocking_read().clone());
138 let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
139 path,
140 &mode,
141 task_ref,
142 Some(&ctx.project_root),
143 pressure_action,
144 resolved_agent_id.as_deref(),
145 );
146 if gate_result.budget_blocked {
147 let msg = gate_result
148 .budget_warning
149 .unwrap_or_else(|| "Agent token budget exceeded".to_string());
150 return Err(ErrorData::invalid_params(msg, None));
151 }
152 let budget_warning = gate_result.budget_warning.clone();
153 if let Some(overridden) = gate_result.overridden_mode {
154 mode = overridden;
155 }
156
157 let mode = if crate::tools::ctx_read::is_instruction_file(path) {
158 "full".to_string()
159 } else {
160 auto_degrade_read_mode(&mode)
161 };
162
163 if mode.starts_with("lines:") {
164 fresh = true;
165 }
166
167 if crate::core::binary_detect::is_binary_file(path) {
168 let msg = crate::core::binary_detect::binary_file_message(path);
169 return Err(ErrorData::invalid_params(msg, None));
170 }
171 {
172 let cap = crate::core::limits::max_read_bytes() as u64;
173 if let Ok(meta) = std::fs::metadata(path) {
174 if meta.len() > cap {
175 let msg = format!(
176 "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
177 Use mode=\"lines:1-100\" for partial reads or increase the limit.",
178 meta.len(),
179 cap
180 );
181 return Err(ErrorData::invalid_params(msg, None));
182 }
183 }
184 }
185
186 let read_timeout = std::time::Duration::from_secs(30);
190 let cancelled = Arc::new(AtomicBool::new(false));
191 let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
192 let cache_lock = cache_lock.clone();
193 let mode = mode.clone();
194 let crp_mode = ctx.crp_mode;
195 let task_owned = current_task.clone();
196 let path_owned = path.to_string();
197 let cancel_flag = cancelled.clone();
198 let (tx, rx) = std::sync::mpsc::sync_channel(1);
199 std::thread::spawn(move || {
200 let file_lock = per_file_lock(&path_owned);
201
202 let _file_guard = {
205 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
206 loop {
207 if cancel_flag.load(Ordering::Relaxed) {
208 return;
209 }
210 if let Ok(guard) = file_lock.try_lock() {
211 break guard;
212 }
213 if std::time::Instant::now() >= deadline {
214 tracing::error!(
215 "ctx_read: per-file lock timeout after 25s for {path_owned}"
216 );
217 return;
218 }
219 std::thread::sleep(std::time::Duration::from_millis(50));
220 }
221 };
222
223 if cancel_flag.load(Ordering::Relaxed) {
224 return;
225 }
226
227 let mut cache = {
230 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(25);
231 loop {
232 if cancel_flag.load(Ordering::Relaxed) {
233 return;
234 }
235 if let Ok(guard) = cache_lock.try_write() {
236 break guard;
237 }
238 if std::time::Instant::now() >= deadline {
239 tracing::error!(
240 "ctx_read: cache write-lock timeout after 25s for {path_owned}"
241 );
242 return;
243 }
244 std::thread::sleep(std::time::Duration::from_millis(50));
245 }
246 };
247
248 let task_ref = task_owned.as_deref();
249 let read_output = if fresh {
250 crate::tools::ctx_read::handle_fresh_with_task_resolved(
251 &mut cache,
252 &path_owned,
253 &mode,
254 crp_mode,
255 task_ref,
256 )
257 } else {
258 crate::tools::ctx_read::handle_with_task_resolved(
259 &mut cache,
260 &path_owned,
261 &mode,
262 crp_mode,
263 task_ref,
264 )
265 };
266 let content = read_output.content;
267 let rmode = read_output.resolved_mode;
268 let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
269 let hit = content.contains(" cached ");
270 let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
271 let stats = cache.get_stats();
272 let stats_snapshot = (stats.total_reads, stats.cache_hits);
273 let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
274 });
275 if let Ok(result) = rx.recv_timeout(read_timeout) {
276 result
277 } else {
278 cancelled.store(true, Ordering::Relaxed);
279 tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
280 let msg = format!(
281 "ERROR: ctx_read timed out after {}s reading {path}. \
282 The file may be very large or a blocking I/O issue occurred. \
283 Try mode=\"lines:1-100\" for a partial read.",
284 read_timeout.as_secs()
285 );
286 return Err(ErrorData::internal_error(msg, None));
287 }
288 };
289
290 let output_tokens = crate::core::tokens::count_tokens(&output);
291 let saved = original.saturating_sub(output_tokens);
292
293 let mut ensured_root: Option<String> = None;
295 let project_root_snapshot;
296 {
297 let mut session = session_lock.blocking_write();
298 session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
299 if is_cache_hit {
300 session.record_cache_hit();
301 }
302 if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
303 let touched: Vec<String> = session
304 .files_touched
305 .iter()
306 .map(|f| f.path.clone())
307 .collect();
308 let inferred =
309 crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
310 if inferred.confidence >= 0.4 {
311 session.active_structured_intent = Some(inferred);
312 }
313 }
314 let root_missing = session
315 .project_root
316 .as_deref()
317 .is_none_or(|r| r.trim().is_empty());
318 if root_missing {
319 if let Some(root) = crate::core::protocol::detect_project_root(path) {
320 session.project_root = Some(root.clone());
321 ensured_root = Some(root);
322 }
323 }
324 project_root_snapshot = session
325 .project_root
326 .clone()
327 .unwrap_or_else(|| ".".to_string());
328 }
329
330 if let Some(root) = ensured_root.as_deref() {
331 crate::core::index_orchestrator::ensure_all_background(root);
332 }
333
334 crate::core::heatmap::record_file_access(path, original, saved);
335
336 {
338 let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
339 let density = if output_tokens > 0 {
340 original as f64 / output_tokens as f64
341 } else {
342 1.0
343 };
344 let outcome = crate::core::mode_predictor::ModeOutcome {
345 mode: resolved_mode.clone(),
346 tokens_in: original,
347 tokens_out: output_tokens,
348 density: density.min(1.0),
349 };
350 let mut predictor = crate::core::mode_predictor::ModePredictor::new();
351 predictor.set_project_root(&project_root_snapshot);
352 predictor.record(sig, outcome);
353 predictor.save();
354
355 let ext = std::path::Path::new(path)
356 .extension()
357 .and_then(|e| e.to_str())
358 .unwrap_or("")
359 .to_string();
360 let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
361 let feedback_outcome = crate::core::feedback::CompressionOutcome {
362 session_id: format!("{}", std::process::id()),
363 language: ext,
364 entropy_threshold: thresholds.bpe_entropy,
365 jaccard_threshold: thresholds.jaccard,
366 total_turns: cache_stats.0 as u32,
367 tokens_saved: saved as u64,
368 tokens_original: original as u64,
369 cache_hits: cache_stats.1 as u32,
370 total_reads: cache_stats.0 as u32,
371 task_completed: true,
372 timestamp: chrono::Local::now().to_rfc3339(),
373 };
374 let mut store = crate::core::feedback::FeedbackStore::load();
375 store.project_root = Some(project_root_snapshot.clone());
376 store.record_outcome(feedback_outcome);
377 }
378
379 if let Some(aid) = resolved_agent_id.as_deref() {
380 crate::core::agent_budget::record_consumption(aid, output_tokens);
381 }
382
383 let final_output = if let Some(ref warning) = budget_warning {
384 format!("{output}\n\n{warning}")
385 } else {
386 output
387 };
388
389 Ok(ToolOutput {
390 text: final_output,
391 original_tokens: original,
392 saved_tokens: saved,
393 mode: Some(resolved_mode),
394 path: Some(path.to_string()),
395 changed: false,
396 })
397 }
398}
399
400fn auto_degrade_read_mode(mode: &str) -> String {
401 use crate::core::degradation_policy::DegradationVerdictV1;
402 let profile = crate::core::profiles::active_profile();
403 if !profile.degradation.enforce_effective() {
404 return mode.to_string();
405 }
406 let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
407 match policy.decision.verdict {
408 DegradationVerdictV1::Ok => mode.to_string(),
409 DegradationVerdictV1::Warn => match mode {
410 "full" => "map".to_string(),
411 other => other.to_string(),
412 },
413 DegradationVerdictV1::Throttle => match mode {
414 "full" | "map" => "signatures".to_string(),
415 other => other.to_string(),
416 },
417 DegradationVerdictV1::Block => "signatures".to_string(),
418 }
419}
420
421#[cfg(test)]
422mod tests {
423 use super::*;
424 use std::sync::atomic::{AtomicUsize, Ordering};
425
426 #[test]
427 fn per_file_lock_same_path_returns_same_mutex() {
428 let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
429 let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
430 assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
431 }
432
433 #[test]
434 fn per_file_lock_different_paths_return_different_mutexes() {
435 let lock_a = per_file_lock("/tmp/test_path_a.txt");
436 let lock_b = per_file_lock("/tmp/test_path_b.txt");
437 assert!(!Arc::ptr_eq(&lock_a, &lock_b));
438 }
439
440 #[test]
441 fn per_file_lock_serializes_concurrent_access() {
442 let counter = Arc::new(AtomicUsize::new(0));
443 let max_concurrent = Arc::new(AtomicUsize::new(0));
444 let path = "/tmp/test_concurrent_serialization.txt";
445 let mut handles = Vec::new();
446
447 for _ in 0..5 {
448 let counter = counter.clone();
449 let max_concurrent = max_concurrent.clone();
450 let path = path.to_string();
451 handles.push(std::thread::spawn(move || {
452 let lock = per_file_lock(&path);
453 let _guard = lock.lock().unwrap();
454 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
455 max_concurrent.fetch_max(active, Ordering::SeqCst);
456 std::thread::sleep(std::time::Duration::from_millis(10));
457 counter.fetch_sub(1, Ordering::SeqCst);
458 }));
459 }
460
461 for h in handles {
462 h.join().unwrap();
463 }
464
465 assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
466 }
467
468 #[test]
469 fn per_file_lock_allows_parallel_different_paths() {
470 let counter = Arc::new(AtomicUsize::new(0));
471 let max_concurrent = Arc::new(AtomicUsize::new(0));
472 let mut handles = Vec::new();
473
474 for i in 0..4 {
475 let counter = counter.clone();
476 let max_concurrent = max_concurrent.clone();
477 let path = format!("/tmp/test_parallel_{i}.txt");
478 handles.push(std::thread::spawn(move || {
479 let lock = per_file_lock(&path);
480 let _guard = lock.lock().unwrap();
481 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
482 max_concurrent.fetch_max(active, Ordering::SeqCst);
483 std::thread::sleep(std::time::Duration::from_millis(50));
484 counter.fetch_sub(1, Ordering::SeqCst);
485 }));
486 }
487
488 for h in handles {
489 h.join().unwrap();
490 }
491
492 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
493 }
494
495 #[test]
499 fn zombie_thread_does_not_block_subsequent_cache_access() {
500 let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
501
502 let zombie_lock = cache.clone();
504 let _zombie = std::thread::spawn(move || {
505 let _guard = zombie_lock.blocking_write();
506 std::thread::sleep(std::time::Duration::from_secs(2));
507 });
508 std::thread::sleep(std::time::Duration::from_millis(50));
509
510 assert!(cache.try_read().is_err());
512
513 let cancel = Arc::new(AtomicBool::new(false));
515 let cancel2 = cancel.clone();
516 let lock2 = cache.clone();
517 let waiter = std::thread::spawn(move || {
518 let start = std::time::Instant::now();
519 loop {
520 if cancel2.load(Ordering::Relaxed) {
521 return (false, start.elapsed());
522 }
523 if let Ok(_guard) = lock2.try_write() {
524 return (true, start.elapsed());
525 }
526 std::thread::sleep(std::time::Duration::from_millis(50));
527 }
528 });
529
530 std::thread::sleep(std::time::Duration::from_millis(200));
532 cancel.store(true, Ordering::Relaxed);
533
534 let (acquired, elapsed) = waiter.join().unwrap();
535 assert!(
536 !acquired,
537 "should not have acquired lock while zombie holds it"
538 );
539 assert!(
540 elapsed < std::time::Duration::from_secs(1),
541 "cancellation should have stopped the loop promptly"
542 );
543 }
544}