1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{
10 get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
11};
12use crate::tool_defs::tool_def;
13
14fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
22 static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
23 std::sync::OnceLock::new();
24 let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
25 let mut map = map.lock().unwrap();
26
27 const MAX_ENTRIES: usize = 500;
28 if map.len() > MAX_ENTRIES {
29 map.retain(|_, v| Arc::strong_count(v) > 1);
30 }
31
32 map.entry(path.to_string())
33 .or_insert_with(|| Arc::new(Mutex::new(())))
34 .clone()
35}
36
37pub struct CtxReadTool;
38
39impl McpTool for CtxReadTool {
40 fn name(&self) -> &'static str {
41 "ctx_read"
42 }
43
44 fn tool_def(&self) -> Tool {
45 tool_def(
46 "ctx_read",
47 "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
48Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
49 json!({
50 "type": "object",
51 "properties": {
52 "path": { "type": "string", "description": "Absolute file path to read" },
53 "mode": {
54 "type": "string",
55 "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
56 },
57 "start_line": {
58 "type": "integer",
59 "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
60 },
61 "fresh": {
62 "type": "boolean",
63 "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
64 }
65 },
66 "required": ["path"]
67 }),
68 )
69 }
70
71 fn handle(
72 &self,
73 args: &Map<String, Value>,
74 ctx: &ToolContext,
75 ) -> Result<ToolOutput, ErrorData> {
76 let path = require_resolved_path(ctx, args, "path")?;
77
78 self.handle_inner(args, ctx, &path)
79 }
80}
81
82impl CtxReadTool {
83 #[allow(clippy::unused_self)]
84 fn handle_inner(
85 &self,
86 args: &Map<String, Value>,
87 ctx: &ToolContext,
88 path: &str,
89 ) -> Result<ToolOutput, ErrorData> {
90 let session_lock = ctx
91 .session
92 .as_ref()
93 .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
94 let cache_lock = ctx
95 .cache
96 .as_ref()
97 .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
98
99 let current_task = {
100 let session = session_lock.blocking_read();
101 session.task.as_ref().map(|t| t.description.clone())
102 };
103 let task_ref = current_task.as_deref();
104
105 let profile = crate::core::profiles::active_profile();
106 let mut mode = if let Some(m) = get_str(args, "mode") {
107 m
108 } else if profile.read.default_mode_effective() == "auto" {
109 if let Ok(cache) = cache_lock.try_read() {
112 crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
113 } else {
114 tracing::debug!(
115 "cache lock contested during auto-mode selection for {path}; \
116 falling back to full"
117 );
118 "full".to_string()
119 }
120 } else {
121 profile.read.default_mode_effective().to_string()
122 };
123
124 let mut fresh = get_bool(args, "fresh").unwrap_or(false);
125 let start_line = get_int(args, "start_line");
126 if let Some(sl) = start_line {
127 let sl = sl.max(1_i64);
128 if sl > 1 {
132 mode = format!("lines:{sl}-999999");
133 fresh = true;
134 }
135 }
136
137 let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
138 let resolved_agent_id = ctx
139 .agent_id
140 .as_ref()
141 .and_then(|a| a.blocking_read().clone());
142 let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
143 path,
144 &mode,
145 task_ref,
146 Some(&ctx.project_root),
147 pressure_action,
148 resolved_agent_id.as_deref(),
149 );
150 if gate_result.budget_blocked {
151 let msg = gate_result
152 .budget_warning
153 .unwrap_or_else(|| "Agent token budget exceeded".to_string());
154 return Err(ErrorData::invalid_params(msg, None));
155 }
156 let budget_warning = gate_result.budget_warning.clone();
157 if let Some(overridden) = gate_result.overridden_mode {
158 mode = overridden;
159 }
160
161 let mode = if crate::tools::ctx_read::is_instruction_file(path) {
162 "full".to_string()
163 } else {
164 auto_degrade_read_mode(&mode)
165 };
166
167 if mode.starts_with("lines:") {
168 fresh = true;
169 }
170
171 if crate::core::binary_detect::is_binary_file(path) {
172 let msg = crate::core::binary_detect::binary_file_message(path);
173 return Err(ErrorData::invalid_params(msg, None));
174 }
175 {
176 let cap = crate::core::limits::max_read_bytes() as u64;
177 if let Ok(meta) = std::fs::metadata(path) {
178 if meta.len() > cap {
179 let msg = format!(
180 "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
181 Use mode=\"lines:1-100\" for partial reads or increase the limit.",
182 meta.len(),
183 cap
184 );
185 return Err(ErrorData::invalid_params(msg, None));
186 }
187 }
188 }
189
190 let read_timeout = std::time::Duration::from_secs(30);
194 let cancelled = Arc::new(AtomicBool::new(false));
195 let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
196 let crp_mode = ctx.crp_mode;
197 let task_ref = current_task.as_deref();
198
199 let fast_result = 'fast: {
200 let file_lock = per_file_lock(path);
201 let Some(_file_guard) = file_lock.try_lock().ok() else {
202 break 'fast None;
203 };
204 let Some(mut cache) = cache_lock.try_write().ok() else {
205 break 'fast None;
206 };
207 let read_output = if fresh {
208 crate::tools::ctx_read::handle_fresh_with_task_resolved(
209 &mut cache, path, &mode, crp_mode, task_ref,
210 )
211 } else {
212 crate::tools::ctx_read::handle_with_task_resolved(
213 &mut cache, path, &mode, crp_mode, task_ref,
214 )
215 };
216 let content = read_output.content;
217 let rmode = read_output.resolved_mode;
218 let orig = cache.get(path).map_or(0, |e| e.original_tokens);
219 let hit = content.contains(" cached ")
220 || content.contains("[unchanged")
221 || content.contains("[delta:");
222 let fref = cache.file_ref_map().get(path).cloned();
223 let stats = cache.get_stats();
224 let stats_snapshot = (stats.total_reads, stats.cache_hits);
225 Some((content, rmode, orig, hit, fref, stats_snapshot))
226 };
227
228 if let Some(result) = fast_result {
229 result
230 } else {
231 let cache_lock = cache_lock.clone();
233 let mode = mode.clone();
234 let task_owned = current_task.clone();
235 let path_owned = path.to_string();
236 let cancel_flag = cancelled.clone();
237 let (tx, rx) = std::sync::mpsc::sync_channel(1);
238 std::thread::spawn(move || {
239 let file_lock = per_file_lock(&path_owned);
240
241 let _file_guard = {
244 let deadline =
245 std::time::Instant::now() + std::time::Duration::from_secs(25);
246 loop {
247 if cancel_flag.load(Ordering::Relaxed) {
248 return;
249 }
250 if let Ok(guard) = file_lock.try_lock() {
251 break guard;
252 }
253 if std::time::Instant::now() >= deadline {
254 tracing::error!(
255 "ctx_read: per-file lock timeout after 25s for {path_owned}"
256 );
257 return;
258 }
259 std::thread::sleep(std::time::Duration::from_millis(50));
260 }
261 };
262
263 if cancel_flag.load(Ordering::Relaxed) {
264 return;
265 }
266
267 let mut cache = {
270 let deadline =
271 std::time::Instant::now() + std::time::Duration::from_secs(25);
272 loop {
273 if cancel_flag.load(Ordering::Relaxed) {
274 return;
275 }
276 if let Ok(guard) = cache_lock.try_write() {
277 break guard;
278 }
279 if std::time::Instant::now() >= deadline {
280 tracing::error!(
281 "ctx_read: cache write-lock timeout after 25s for {path_owned}"
282 );
283 return;
284 }
285 std::thread::sleep(std::time::Duration::from_millis(50));
286 }
287 };
288
289 let task_ref = task_owned.as_deref();
290 let read_output = if fresh {
291 crate::tools::ctx_read::handle_fresh_with_task_resolved(
292 &mut cache,
293 &path_owned,
294 &mode,
295 crp_mode,
296 task_ref,
297 )
298 } else {
299 crate::tools::ctx_read::handle_with_task_resolved(
300 &mut cache,
301 &path_owned,
302 &mode,
303 crp_mode,
304 task_ref,
305 )
306 };
307 let content = read_output.content;
308 let rmode = read_output.resolved_mode;
309 let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
310 let hit = content.contains(" cached ");
311 let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
312 let stats = cache.get_stats();
313 let stats_snapshot = (stats.total_reads, stats.cache_hits);
314 let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
315 });
316 if let Ok(result) = rx.recv_timeout(read_timeout) {
317 result
318 } else {
319 cancelled.store(true, Ordering::Relaxed);
320 tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
321 let msg = format!(
322 "ERROR: ctx_read timed out after {}s reading {path}. \
323 The file may be very large or a blocking I/O issue occurred. \
324 Try mode=\"lines:1-100\" for a partial read.",
325 read_timeout.as_secs()
326 );
327 return Err(ErrorData::internal_error(msg, None));
328 }
329 } };
331
332 if resolved_mode == "error" {
334 return Err(ErrorData::invalid_params(output, None));
335 }
336
337 let output_tokens = crate::core::tokens::count_tokens(&output);
338 let saved = original.saturating_sub(output_tokens);
339
340 let mut ensured_root: Option<String> = None;
342 let project_root_snapshot;
343 {
344 let mut session = session_lock.blocking_write();
345 session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
346 if is_cache_hit {
347 session.record_cache_hit();
348 }
349 if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
350 let touched: Vec<String> = session
351 .files_touched
352 .iter()
353 .map(|f| f.path.clone())
354 .collect();
355 let inferred =
356 crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
357 if inferred.confidence >= 0.4 {
358 session.active_structured_intent = Some(inferred);
359 }
360 }
361 let root_missing = session
362 .project_root
363 .as_deref()
364 .is_none_or(|r| r.trim().is_empty());
365 if root_missing {
366 if let Some(root) = crate::core::protocol::detect_project_root(path) {
367 session.project_root = Some(root.clone());
368 ensured_root = Some(root);
369 }
370 }
371 project_root_snapshot = session
372 .project_root
373 .clone()
374 .unwrap_or_else(|| ".".to_string());
375 }
376
377 if let Some(root) = ensured_root.as_deref() {
378 crate::core::index_orchestrator::ensure_all_background(root);
379 }
380
381 crate::core::heatmap::record_file_access(path, original, saved);
382
383 {
385 let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
386 let density = if output_tokens > 0 {
387 original as f64 / output_tokens as f64
388 } else {
389 1.0
390 };
391 let outcome = crate::core::mode_predictor::ModeOutcome {
392 mode: resolved_mode.clone(),
393 tokens_in: original,
394 tokens_out: output_tokens,
395 density: density.min(1.0),
396 };
397 let mut predictor = crate::core::mode_predictor::ModePredictor::new();
398 predictor.set_project_root(&project_root_snapshot);
399 predictor.record(sig, outcome);
400 predictor.save();
401
402 let ext = std::path::Path::new(path)
403 .extension()
404 .and_then(|e| e.to_str())
405 .unwrap_or("")
406 .to_string();
407 let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
408 let feedback_outcome = crate::core::feedback::CompressionOutcome {
409 session_id: format!("{}", std::process::id()),
410 language: ext,
411 entropy_threshold: thresholds.bpe_entropy,
412 jaccard_threshold: thresholds.jaccard,
413 total_turns: cache_stats.0 as u32,
414 tokens_saved: saved as u64,
415 tokens_original: original as u64,
416 cache_hits: cache_stats.1 as u32,
417 total_reads: cache_stats.0 as u32,
418 task_completed: true,
419 timestamp: chrono::Local::now().to_rfc3339(),
420 };
421 let mut store = crate::core::feedback::FeedbackStore::load();
422 store.project_root = Some(project_root_snapshot.clone());
423 store.record_outcome(feedback_outcome);
424 }
425
426 if let Some(aid) = resolved_agent_id.as_deref() {
427 crate::core::agent_budget::record_consumption(aid, output_tokens);
428 }
429
430 let final_output = if let Some(ref warning) = budget_warning {
431 format!("{output}\n\n{warning}")
432 } else {
433 output
434 };
435
436 Ok(ToolOutput {
437 text: final_output,
438 original_tokens: original,
439 saved_tokens: saved,
440 mode: Some(resolved_mode),
441 path: Some(path.to_string()),
442 changed: false,
443 })
444 }
445}
446
447fn auto_degrade_read_mode(mode: &str) -> String {
448 use crate::core::degradation_policy::DegradationVerdictV1;
449 let profile = crate::core::profiles::active_profile();
450 if !profile.degradation.enforce_effective() {
451 return mode.to_string();
452 }
453 let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
454 match policy.decision.verdict {
455 DegradationVerdictV1::Ok => mode.to_string(),
456 DegradationVerdictV1::Warn => match mode {
457 "full" => "map".to_string(),
458 other => other.to_string(),
459 },
460 DegradationVerdictV1::Throttle => match mode {
461 "full" | "map" => "signatures".to_string(),
462 other => other.to_string(),
463 },
464 DegradationVerdictV1::Block => "signatures".to_string(),
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use std::sync::atomic::{AtomicUsize, Ordering};
472
473 #[test]
474 fn per_file_lock_same_path_returns_same_mutex() {
475 let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
476 let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
477 assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
478 }
479
480 #[test]
481 fn per_file_lock_different_paths_return_different_mutexes() {
482 let lock_a = per_file_lock("/tmp/test_path_a.txt");
483 let lock_b = per_file_lock("/tmp/test_path_b.txt");
484 assert!(!Arc::ptr_eq(&lock_a, &lock_b));
485 }
486
487 #[test]
488 fn per_file_lock_serializes_concurrent_access() {
489 let counter = Arc::new(AtomicUsize::new(0));
490 let max_concurrent = Arc::new(AtomicUsize::new(0));
491 let path = "/tmp/test_concurrent_serialization.txt";
492 let mut handles = Vec::new();
493
494 for _ in 0..5 {
495 let counter = counter.clone();
496 let max_concurrent = max_concurrent.clone();
497 let path = path.to_string();
498 handles.push(std::thread::spawn(move || {
499 let lock = per_file_lock(&path);
500 let _guard = lock.lock().unwrap();
501 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
502 max_concurrent.fetch_max(active, Ordering::SeqCst);
503 std::thread::sleep(std::time::Duration::from_millis(10));
504 counter.fetch_sub(1, Ordering::SeqCst);
505 }));
506 }
507
508 for h in handles {
509 h.join().unwrap();
510 }
511
512 assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
513 }
514
515 #[test]
516 fn per_file_lock_allows_parallel_different_paths() {
517 let counter = Arc::new(AtomicUsize::new(0));
518 let max_concurrent = Arc::new(AtomicUsize::new(0));
519 let mut handles = Vec::new();
520
521 for i in 0..4 {
522 let counter = counter.clone();
523 let max_concurrent = max_concurrent.clone();
524 let path = format!("/tmp/test_parallel_{i}.txt");
525 handles.push(std::thread::spawn(move || {
526 let lock = per_file_lock(&path);
527 let _guard = lock.lock().unwrap();
528 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
529 max_concurrent.fetch_max(active, Ordering::SeqCst);
530 std::thread::sleep(std::time::Duration::from_millis(50));
531 counter.fetch_sub(1, Ordering::SeqCst);
532 }));
533 }
534
535 for h in handles {
536 h.join().unwrap();
537 }
538
539 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
540 }
541
542 #[test]
546 fn zombie_thread_does_not_block_subsequent_cache_access() {
547 let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
548
549 let zombie_lock = cache.clone();
551 let _zombie = std::thread::spawn(move || {
552 let _guard = zombie_lock.blocking_write();
553 std::thread::sleep(std::time::Duration::from_secs(2));
554 });
555 std::thread::sleep(std::time::Duration::from_millis(50));
556
557 assert!(cache.try_read().is_err());
559
560 let cancel = Arc::new(AtomicBool::new(false));
562 let cancel2 = cancel.clone();
563 let lock2 = cache.clone();
564 let waiter = std::thread::spawn(move || {
565 let start = std::time::Instant::now();
566 loop {
567 if cancel2.load(Ordering::Relaxed) {
568 return (false, start.elapsed());
569 }
570 if let Ok(_guard) = lock2.try_write() {
571 return (true, start.elapsed());
572 }
573 std::thread::sleep(std::time::Duration::from_millis(50));
574 }
575 });
576
577 std::thread::sleep(std::time::Duration::from_millis(200));
579 cancel.store(true, Ordering::Relaxed);
580
581 let (acquired, elapsed) = waiter.join().unwrap();
582 assert!(
583 !acquired,
584 "should not have acquired lock while zombie holds it"
585 );
586 assert!(
587 elapsed < std::time::Duration::from_secs(1),
588 "cancellation should have stopped the loop promptly"
589 );
590 }
591
592 #[test]
595 fn start_line_1_does_not_override_mode() {
596 let mut mode = "auto".to_string();
599 let mut fresh = false;
600 let start_line: Option<i64> = Some(1);
601
602 if let Some(sl) = start_line {
603 let sl = sl.max(1_i64);
604 if sl > 1 {
605 mode = format!("lines:{sl}-999999");
606 fresh = true;
607 }
608 }
609
610 assert_eq!(mode, "auto", "start_line=1 should not change mode");
611 assert!(!fresh, "start_line=1 should not force fresh=true");
612 }
613
614 #[test]
615 fn start_line_greater_than_1_overrides_mode() {
616 let mut mode = "auto".to_string();
617 let mut fresh = false;
618 let start_line: Option<i64> = Some(50);
619
620 if let Some(sl) = start_line {
621 let sl = sl.max(1_i64);
622 if sl > 1 {
623 mode = format!("lines:{sl}-999999");
624 fresh = true;
625 }
626 }
627
628 assert_eq!(mode, "lines:50-999999");
629 assert!(fresh, "start_line>1 should force fresh=true");
630 }
631
632 #[test]
633 fn start_line_none_does_nothing() {
634 let mut mode = "map".to_string();
635 let mut fresh = false;
636 let start_line: Option<i64> = None;
637
638 if let Some(sl) = start_line {
639 let sl = sl.max(1_i64);
640 if sl > 1 {
641 mode = format!("lines:{sl}-999999");
642 fresh = true;
643 }
644 }
645
646 assert_eq!(mode, "map", "absent start_line should preserve mode");
647 assert!(!fresh);
648 }
649}