1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{
10 get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
11};
12use crate::tool_defs::tool_def;
13
14fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
22 static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
23 std::sync::OnceLock::new();
24 let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
25 let mut map = map.lock().unwrap();
26
27 const MAX_ENTRIES: usize = 500;
28 if map.len() > MAX_ENTRIES {
29 map.retain(|_, v| Arc::strong_count(v) > 1);
30 }
31
32 map.entry(path.to_string())
33 .or_insert_with(|| Arc::new(Mutex::new(())))
34 .clone()
35}
36
37pub struct CtxReadTool;
38
39impl McpTool for CtxReadTool {
40 fn name(&self) -> &'static str {
41 "ctx_read"
42 }
43
44 fn tool_def(&self) -> Tool {
45 tool_def(
46 "ctx_read",
47 "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
48Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
49 json!({
50 "type": "object",
51 "properties": {
52 "path": { "type": "string", "description": "Absolute file path to read" },
53 "mode": {
54 "type": "string",
55 "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
56 },
57 "start_line": {
58 "type": "integer",
59 "description": "Read from this line number to end of file. Implies fresh=true (disk re-read) to avoid stale snippets."
60 },
61 "fresh": {
62 "type": "boolean",
63 "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
64 }
65 },
66 "required": ["path"]
67 }),
68 )
69 }
70
71 fn handle(
72 &self,
73 args: &Map<String, Value>,
74 ctx: &ToolContext,
75 ) -> Result<ToolOutput, ErrorData> {
76 let path = require_resolved_path(ctx, args, "path")?;
77
78 self.handle_inner(args, ctx, &path)
79 }
80}
81
82impl CtxReadTool {
83 #[allow(clippy::unused_self)]
84 fn handle_inner(
85 &self,
86 args: &Map<String, Value>,
87 ctx: &ToolContext,
88 path: &str,
89 ) -> Result<ToolOutput, ErrorData> {
90 let session_lock = ctx
91 .session
92 .as_ref()
93 .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
94 let cache_lock = ctx
95 .cache
96 .as_ref()
97 .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
98
99 let current_task = {
100 let Ok(session) = tokio::task::block_in_place(|| {
101 let rt = tokio::runtime::Handle::current();
102 rt.block_on(tokio::time::timeout(
103 std::time::Duration::from_secs(5),
104 session_lock.read(),
105 ))
106 }) else {
107 tracing::warn!("session read-lock timeout (5s) in ctx_read for {path}");
108 return Err(ErrorData::internal_error(
109 "session lock timeout — another tool may be holding it. Retry in a moment.",
110 None,
111 ));
112 };
113 session.task.as_ref().map(|t| t.description.clone())
114 };
115 let task_ref = current_task.as_deref();
116
117 let profile = crate::core::profiles::active_profile();
118 let mut mode = if let Some(m) = get_str(args, "mode") {
119 m
120 } else if profile.read.default_mode_effective() == "auto" {
121 if let Ok(cache) = cache_lock.try_read() {
124 crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
125 } else {
126 tracing::debug!(
127 "cache lock contested during auto-mode selection for {path}; \
128 falling back to full"
129 );
130 "full".to_string()
131 }
132 } else {
133 profile.read.default_mode_effective().to_string()
134 };
135
136 let mut fresh = get_bool(args, "fresh").unwrap_or(false);
137 let cache_policy = crate::server::compaction_sync::effective_cache_policy();
138 if cache_policy == "off" {
139 fresh = true;
140 }
141 let start_line = get_int(args, "start_line");
142 if let Some(sl) = start_line {
143 let sl = sl.max(1_i64);
144 if sl > 1 {
148 mode = format!("lines:{sl}-999999");
149 fresh = true;
150 }
151 }
152
153 let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
154 let resolved_agent_id = ctx.agent_id.as_ref().and_then(|a| match a.try_read() {
155 Ok(guard) => guard.clone(),
156 Err(_) => None,
157 });
158 let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
159 path,
160 &mode,
161 task_ref,
162 Some(&ctx.project_root),
163 pressure_action,
164 resolved_agent_id.as_deref(),
165 );
166 if gate_result.budget_blocked {
167 let msg = gate_result
168 .budget_warning
169 .unwrap_or_else(|| "Agent token budget exceeded".to_string());
170 return Err(ErrorData::invalid_params(msg, None));
171 }
172 let budget_warning = gate_result.budget_warning.clone();
173 if let Some(overridden) = gate_result.overridden_mode {
174 mode = overridden;
175 }
176
177 let mode = if crate::tools::ctx_read::is_instruction_file(path) {
178 "full".to_string()
179 } else {
180 auto_degrade_read_mode(&mode)
181 };
182
183 if mode.starts_with("lines:") {
184 fresh = true;
185 }
186
187 if crate::core::binary_detect::is_binary_file(path) {
188 let msg = crate::core::binary_detect::binary_file_message(path);
189 return Err(ErrorData::invalid_params(msg, None));
190 }
191 {
192 let cap = crate::core::limits::max_read_bytes() as u64;
193 if let Ok(meta) = std::fs::metadata(path) {
194 if meta.len() > cap {
195 let msg = format!(
196 "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
197 Use mode=\"lines:1-100\" for partial reads or increase the limit.",
198 meta.len(),
199 cap
200 );
201 return Err(ErrorData::invalid_params(msg, None));
202 }
203 }
204 }
205
206 if !fresh {
209 if let Ok(data_dir) = crate::core::data_dir::lean_ctx_data_dir() {
210 if let Ok(mut cache) = cache_lock.try_write() {
211 crate::server::compaction_sync::sync_if_compacted(&mut cache, &data_dir);
212 }
213 }
214 }
215
216 let read_timeout = std::time::Duration::from_secs(30);
220 let cancelled = Arc::new(AtomicBool::new(false));
221 let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
222 let crp_mode = ctx.crp_mode;
223 let task_ref = current_task.as_deref();
224
225 let fast_result = 'fast: {
226 let file_lock = per_file_lock(path);
227 let Some(_file_guard) = file_lock.try_lock().ok() else {
228 break 'fast None;
229 };
230 let Some(mut cache) = cache_lock.try_write().ok() else {
231 break 'fast None;
232 };
233 let read_output = if fresh {
234 crate::tools::ctx_read::handle_fresh_with_task_resolved(
235 &mut cache, path, &mode, crp_mode, task_ref,
236 )
237 } else {
238 crate::tools::ctx_read::handle_with_task_resolved(
239 &mut cache, path, &mode, crp_mode, task_ref,
240 )
241 };
242 let content = read_output.content;
243 let rmode = read_output.resolved_mode;
244 let orig = cache.get(path).map_or(0, |e| e.original_tokens);
245 let hit = content.contains(" cached ")
246 || content.contains("[unchanged")
247 || content.contains("[delta:");
248 let fref = cache.file_ref_map().get(path).cloned();
249 let stats = cache.get_stats();
250 let stats_snapshot = (stats.total_reads, stats.cache_hits);
251 Some((content, rmode, orig, hit, fref, stats_snapshot))
252 };
253
254 if let Some(result) = fast_result {
255 result
256 } else {
257 let cache_lock = cache_lock.clone();
259 let mode = mode.clone();
260 let task_owned = current_task.clone();
261 let path_owned = path.to_string();
262 let cancel_flag = cancelled.clone();
263 let (tx, rx) = std::sync::mpsc::sync_channel(1);
264 std::thread::spawn(move || {
265 let file_lock = per_file_lock(&path_owned);
266
267 let _file_guard = {
270 let deadline =
271 std::time::Instant::now() + std::time::Duration::from_secs(25);
272 loop {
273 if cancel_flag.load(Ordering::Relaxed) {
274 return;
275 }
276 if let Ok(guard) = file_lock.try_lock() {
277 break guard;
278 }
279 if std::time::Instant::now() >= deadline {
280 tracing::error!(
281 "ctx_read: per-file lock timeout after 25s for {path_owned}"
282 );
283 return;
284 }
285 std::thread::sleep(std::time::Duration::from_millis(50));
286 }
287 };
288
289 if cancel_flag.load(Ordering::Relaxed) {
290 return;
291 }
292
293 let mut cache = {
296 let deadline =
297 std::time::Instant::now() + std::time::Duration::from_secs(25);
298 loop {
299 if cancel_flag.load(Ordering::Relaxed) {
300 return;
301 }
302 if let Ok(guard) = cache_lock.try_write() {
303 break guard;
304 }
305 if std::time::Instant::now() >= deadline {
306 tracing::error!(
307 "ctx_read: cache write-lock timeout after 25s for {path_owned}"
308 );
309 return;
310 }
311 std::thread::sleep(std::time::Duration::from_millis(50));
312 }
313 };
314
315 let task_ref = task_owned.as_deref();
316 let read_output = if fresh {
317 crate::tools::ctx_read::handle_fresh_with_task_resolved(
318 &mut cache,
319 &path_owned,
320 &mode,
321 crp_mode,
322 task_ref,
323 )
324 } else {
325 crate::tools::ctx_read::handle_with_task_resolved(
326 &mut cache,
327 &path_owned,
328 &mode,
329 crp_mode,
330 task_ref,
331 )
332 };
333 let content = read_output.content;
334 let rmode = read_output.resolved_mode;
335 let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
336 let hit = content.contains(" cached ");
337 let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
338 let stats = cache.get_stats();
339 let stats_snapshot = (stats.total_reads, stats.cache_hits);
340 let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
341 });
342 if let Ok(result) = rx.recv_timeout(read_timeout) {
343 result
344 } else {
345 cancelled.store(true, Ordering::Relaxed);
346 tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
347 let msg = format!(
348 "ERROR: ctx_read timed out after {}s reading {path}. \
349 The file may be very large or a blocking I/O issue occurred. \
350 Try mode=\"lines:1-100\" for a partial read.",
351 read_timeout.as_secs()
352 );
353 return Err(ErrorData::internal_error(msg, None));
354 }
355 } };
357
358 if resolved_mode == "error" {
360 return Err(ErrorData::invalid_params(output, None));
361 }
362
363 let output_tokens = crate::core::tokens::count_tokens(&output);
364 let saved = original.saturating_sub(output_tokens);
365
366 let mut ensured_root: Option<String> = None;
368 let project_root_snapshot;
369 {
370 let session_guard = tokio::task::block_in_place(|| {
371 let rt = tokio::runtime::Handle::current();
372 rt.block_on(tokio::time::timeout(
373 std::time::Duration::from_secs(5),
374 session_lock.write(),
375 ))
376 });
377 if let Ok(mut session) = session_guard {
378 session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
379 if is_cache_hit {
380 session.record_cache_hit();
381 }
382 if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
383 let touched: Vec<String> = session
384 .files_touched
385 .iter()
386 .map(|f| f.path.clone())
387 .collect();
388 let inferred =
389 crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
390 if inferred.confidence >= 0.4 {
391 session.active_structured_intent = Some(inferred);
392 }
393 }
394 let root_missing = session
395 .project_root
396 .as_deref()
397 .is_none_or(|r| r.trim().is_empty());
398 if root_missing {
399 if let Some(root) = crate::core::protocol::detect_project_root(path) {
400 session.project_root = Some(root.clone());
401 ensured_root = Some(root);
402 }
403 }
404 project_root_snapshot = session
405 .project_root
406 .clone()
407 .unwrap_or_else(|| ".".to_string());
408 } else {
409 tracing::warn!(
410 "session write-lock timeout (5s) in ctx_read post-update for {path}"
411 );
412 project_root_snapshot = ctx.project_root.clone();
413 }
414 }
415
416 if let Some(root) = ensured_root.as_deref() {
417 crate::core::index_orchestrator::ensure_all_background(root);
418 }
419
420 crate::core::heatmap::record_file_access(path, original, saved);
421
422 {
424 let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
425 let density = if output_tokens > 0 {
426 original as f64 / output_tokens as f64
427 } else {
428 1.0
429 };
430 let outcome = crate::core::mode_predictor::ModeOutcome {
431 mode: resolved_mode.clone(),
432 tokens_in: original,
433 tokens_out: output_tokens,
434 density: density.min(1.0),
435 };
436 let mut predictor = crate::core::mode_predictor::ModePredictor::new();
437 predictor.set_project_root(&project_root_snapshot);
438 predictor.record(sig, outcome);
439 predictor.save();
440
441 let ext = std::path::Path::new(path)
442 .extension()
443 .and_then(|e| e.to_str())
444 .unwrap_or("")
445 .to_string();
446 let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
447 let feedback_outcome = crate::core::feedback::CompressionOutcome {
448 session_id: format!("{}", std::process::id()),
449 language: ext,
450 entropy_threshold: thresholds.bpe_entropy,
451 jaccard_threshold: thresholds.jaccard,
452 total_turns: cache_stats.0 as u32,
453 tokens_saved: saved as u64,
454 tokens_original: original as u64,
455 cache_hits: cache_stats.1 as u32,
456 total_reads: cache_stats.0 as u32,
457 task_completed: true,
458 timestamp: chrono::Local::now().to_rfc3339(),
459 };
460 let mut store = crate::core::feedback::FeedbackStore::load();
461 store.project_root = Some(project_root_snapshot.clone());
462 store.record_outcome(feedback_outcome);
463 }
464
465 if let Some(aid) = resolved_agent_id.as_deref() {
466 crate::core::agent_budget::record_consumption(aid, output_tokens);
467 }
468
469 let final_output = if let Some(ref warning) = budget_warning {
470 format!("{output}\n\n{warning}")
471 } else {
472 output
473 };
474
475 Ok(ToolOutput {
476 text: final_output,
477 original_tokens: original,
478 saved_tokens: saved,
479 mode: Some(resolved_mode),
480 path: Some(path.to_string()),
481 changed: false,
482 })
483 }
484}
485
486fn auto_degrade_read_mode(mode: &str) -> String {
487 use crate::core::degradation_policy::DegradationVerdictV1;
488 let profile = crate::core::profiles::active_profile();
489 if !profile.degradation.enforce_effective() {
490 return mode.to_string();
491 }
492 let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
493 match policy.decision.verdict {
494 DegradationVerdictV1::Ok => mode.to_string(),
495 DegradationVerdictV1::Warn => match mode {
496 "full" => "map".to_string(),
497 other => other.to_string(),
498 },
499 DegradationVerdictV1::Throttle => match mode {
500 "full" | "map" => "signatures".to_string(),
501 other => other.to_string(),
502 },
503 DegradationVerdictV1::Block => "signatures".to_string(),
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use std::sync::atomic::{AtomicUsize, Ordering};
511
512 #[test]
513 fn per_file_lock_same_path_returns_same_mutex() {
514 let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
515 let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
516 assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
517 }
518
519 #[test]
520 fn per_file_lock_different_paths_return_different_mutexes() {
521 let lock_a = per_file_lock("/tmp/test_path_a.txt");
522 let lock_b = per_file_lock("/tmp/test_path_b.txt");
523 assert!(!Arc::ptr_eq(&lock_a, &lock_b));
524 }
525
526 #[test]
527 fn per_file_lock_serializes_concurrent_access() {
528 let counter = Arc::new(AtomicUsize::new(0));
529 let max_concurrent = Arc::new(AtomicUsize::new(0));
530 let path = "/tmp/test_concurrent_serialization.txt";
531 let mut handles = Vec::new();
532
533 for _ in 0..5 {
534 let counter = counter.clone();
535 let max_concurrent = max_concurrent.clone();
536 let path = path.to_string();
537 handles.push(std::thread::spawn(move || {
538 let lock = per_file_lock(&path);
539 let _guard = lock.lock().unwrap();
540 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
541 max_concurrent.fetch_max(active, Ordering::SeqCst);
542 std::thread::sleep(std::time::Duration::from_millis(10));
543 counter.fetch_sub(1, Ordering::SeqCst);
544 }));
545 }
546
547 for h in handles {
548 h.join().unwrap();
549 }
550
551 assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
552 }
553
554 #[test]
555 fn per_file_lock_allows_parallel_different_paths() {
556 let counter = Arc::new(AtomicUsize::new(0));
557 let max_concurrent = Arc::new(AtomicUsize::new(0));
558 let mut handles = Vec::new();
559
560 for i in 0..4 {
561 let counter = counter.clone();
562 let max_concurrent = max_concurrent.clone();
563 let path = format!("/tmp/test_parallel_{i}.txt");
564 handles.push(std::thread::spawn(move || {
565 let lock = per_file_lock(&path);
566 let _guard = lock.lock().unwrap();
567 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
568 max_concurrent.fetch_max(active, Ordering::SeqCst);
569 std::thread::sleep(std::time::Duration::from_millis(50));
570 counter.fetch_sub(1, Ordering::SeqCst);
571 }));
572 }
573
574 for h in handles {
575 h.join().unwrap();
576 }
577
578 assert!(max_concurrent.load(Ordering::SeqCst) > 1);
579 }
580
581 #[test]
585 fn zombie_thread_does_not_block_subsequent_cache_access() {
586 let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
587
588 let zombie_lock = cache.clone();
590 let _zombie = std::thread::spawn(move || {
591 let _guard = zombie_lock.blocking_write();
592 std::thread::sleep(std::time::Duration::from_secs(2));
593 });
594 std::thread::sleep(std::time::Duration::from_millis(50));
595
596 assert!(cache.try_read().is_err());
598
599 let cancel = Arc::new(AtomicBool::new(false));
601 let cancel2 = cancel.clone();
602 let lock2 = cache.clone();
603 let waiter = std::thread::spawn(move || {
604 let start = std::time::Instant::now();
605 loop {
606 if cancel2.load(Ordering::Relaxed) {
607 return (false, start.elapsed());
608 }
609 if let Ok(_guard) = lock2.try_write() {
610 return (true, start.elapsed());
611 }
612 std::thread::sleep(std::time::Duration::from_millis(50));
613 }
614 });
615
616 std::thread::sleep(std::time::Duration::from_millis(200));
618 cancel.store(true, Ordering::Relaxed);
619
620 let (acquired, elapsed) = waiter.join().unwrap();
621 assert!(
622 !acquired,
623 "should not have acquired lock while zombie holds it"
624 );
625 assert!(
626 elapsed < std::time::Duration::from_secs(1),
627 "cancellation should have stopped the loop promptly"
628 );
629 }
630
631 #[test]
634 fn start_line_1_does_not_override_mode() {
635 let mut mode = "auto".to_string();
638 let mut fresh = false;
639 let start_line: Option<i64> = Some(1);
640
641 if let Some(sl) = start_line {
642 let sl = sl.max(1_i64);
643 if sl > 1 {
644 mode = format!("lines:{sl}-999999");
645 fresh = true;
646 }
647 }
648
649 assert_eq!(mode, "auto", "start_line=1 should not change mode");
650 assert!(!fresh, "start_line=1 should not force fresh=true");
651 }
652
653 #[test]
654 fn start_line_greater_than_1_overrides_mode() {
655 let mut mode = "auto".to_string();
656 let mut fresh = false;
657 let start_line: Option<i64> = Some(50);
658
659 if let Some(sl) = start_line {
660 let sl = sl.max(1_i64);
661 if sl > 1 {
662 mode = format!("lines:{sl}-999999");
663 fresh = true;
664 }
665 }
666
667 assert_eq!(mode, "lines:50-999999");
668 assert!(fresh, "start_line>1 should force fresh=true");
669 }
670
671 #[test]
672 fn start_line_none_does_nothing() {
673 let mut mode = "map".to_string();
674 let mut fresh = false;
675 let start_line: Option<i64> = None;
676
677 if let Some(sl) = start_line {
678 let sl = sl.max(1_i64);
679 if sl > 1 {
680 mode = format!("lines:{sl}-999999");
681 fresh = true;
682 }
683 }
684
685 assert_eq!(mode, "map", "absent start_line should preserve mode");
686 assert!(!fresh);
687 }
688}