Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::{Arc, Mutex};
3
4use rmcp::model::Tool;
5use rmcp::ErrorData;
6use serde_json::{json, Map, Value};
7
8use crate::server::tool_trait::{
9    get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
10};
11use crate::tool_defs::tool_def;
12
13/// Per-file lock that serializes concurrent reads of the same path.
14///
15/// When multiple subagents read sequentially through a shared set of files,
16/// they tend to hit the same path at the same time. Without per-file locking
17/// they all contend on the global cache write lock while doing redundant I/O.
18/// This lock ensures only one thread reads a given file from disk; the others
19/// wait cheaply on the per-file mutex, then hit the warm cache.
20///
21/// Backed by the shared `core::path_locks` registry so reads and edits of the
22/// same path coordinate through a single mutex (see issue #320).
23fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
24    crate::core::path_locks::per_file_lock(path)
25}
26
27pub struct CtxReadTool;
28
29impl McpTool for CtxReadTool {
30    fn name(&self) -> &'static str {
31        "ctx_read"
32    }
33
34    fn tool_def(&self) -> Tool {
35        tool_def(
36            "ctx_read",
37            "Read a file. Prefer over native Read/cat/head/tail (cached, compressed).\n\
38             Unchanged re-reads cost ~13 tokens. Auto-selects mode (full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M). fresh=true forces a disk re-read.",
39            json!({
40                "type": "object",
41                "properties": {
42                    "path": { "type": "string", "description": "Absolute file path to read" },
43                    "mode": {
44                        "type": "string",
45                        "description": "Compression mode (default: auto — resolved per file type/size). Explicit 'full' for guaranteed complete content. Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
46                    },
47                    "start_line": {
48                        "type": "integer",
49                        "description": "Start reading from this line (only used when no explicit mode is set, or with mode=lines). Does NOT override explicit modes like map/signatures."
50                    },
51                    "fresh": {
52                        "type": "boolean",
53                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
54                    }
55                },
56                "required": ["path"]
57            }),
58        )
59    }
60
61    fn handle(
62        &self,
63        args: &Map<String, Value>,
64        ctx: &ToolContext,
65    ) -> Result<ToolOutput, ErrorData> {
66        let path = require_resolved_path(ctx, args, "path")?;
67
68        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
69            self.handle_inner(args, ctx, &path)
70        })) {
71            Ok(result) => result,
72            Err(_) => Err(ErrorData::internal_error(
73                format!("ctx_read panicked while processing '{path}'. This is a bug — please report it."),
74                None,
75            )),
76        }
77    }
78}
79
80impl CtxReadTool {
81    #[allow(clippy::unused_self)]
82    fn handle_inner(
83        &self,
84        args: &Map<String, Value>,
85        ctx: &ToolContext,
86        path: &str,
87    ) -> Result<ToolOutput, ErrorData> {
88        let session_lock = ctx
89            .session
90            .as_ref()
91            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
92        let cache_lock = ctx
93            .cache
94            .as_ref()
95            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
96
97        let current_task = {
98            let rt = tokio::runtime::Handle::current();
99            let mut attempt = 0u32;
100            loop {
101                if let Ok(session) = rt.block_on(tokio::time::timeout(
102                    std::time::Duration::from_secs(5),
103                    session_lock.read(),
104                )) {
105                    break session.task.as_ref().map(|t| t.description.clone());
106                }
107                attempt += 1;
108                if attempt >= 3 {
109                    tracing::warn!(
110                        "session read-lock timeout after {attempt} attempts in ctx_read for {path}"
111                    );
112                    return Err(ErrorData::internal_error(
113                        "session lock timeout — another tool may be holding it. Retry in a moment.",
114                        None,
115                    ));
116                }
117                tracing::debug!(
118                    "session read-lock attempt {attempt}/3 timed out for {path}, retrying"
119                );
120                std::thread::sleep(std::time::Duration::from_millis(100 * u64::from(attempt)));
121            }
122        };
123        let task_ref = current_task.as_deref();
124
125        let profile = crate::core::profiles::active_profile();
126        let explicit_mode_arg = get_str(args, "mode");
127        let explicit_mode = explicit_mode_arg.is_some();
128        let mut mode = if let Some(m) = explicit_mode_arg {
129            m
130        } else if profile.read.default_mode_effective() == "auto" {
131            if let Ok(cache) = cache_lock.try_read() {
132                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
133            } else {
134                tracing::debug!(
135                    "cache lock contested during auto-mode selection for {path}; \
136                     falling back to full"
137                );
138                "full".to_string()
139            }
140        } else {
141            profile.read.default_mode_effective().to_string()
142        };
143        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
144        let cache_policy = crate::server::compaction_sync::effective_cache_policy();
145        if cache_policy == "off" {
146            fresh = true;
147        }
148        let start_line = get_int(args, "start_line");
149        if let Some(sl) = start_line {
150            let sl = sl.max(1_i64);
151            if sl > 1 {
152                fresh = true;
153                // Only override mode when no explicit mode was requested,
154                // or when the explicit mode is already a lines range.
155                // If the caller explicitly set mode=map/signatures/etc.,
156                // start_line must not clobber it (GitHub #259).
157                if !explicit_mode || mode.starts_with("lines") {
158                    mode = format!("lines:{sl}-999999");
159                }
160            }
161        }
162
163        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
164        let resolved_agent_id = ctx.agent_id.as_ref().and_then(|a| match a.try_read() {
165            Ok(guard) => guard.clone(),
166            Err(_) => None,
167        });
168        let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
169            path,
170            &mode,
171            task_ref,
172            Some(&ctx.project_root),
173            pressure_action,
174            resolved_agent_id.as_deref(),
175        );
176        if gate_result.budget_blocked {
177            let msg = gate_result
178                .budget_warning
179                .unwrap_or_else(|| "Agent token budget exceeded".to_string());
180            return Err(ErrorData::invalid_params(msg, None));
181        }
182        let budget_warning = gate_result.budget_warning.clone();
183        if let Some(overridden) = gate_result.overridden_mode {
184            mode = overridden;
185        }
186
187        let (mode, degrade_warning) = if crate::tools::ctx_read::is_instruction_file(path) {
188            ("full".to_string(), None)
189        } else {
190            auto_degrade_read_mode(&mode)
191        };
192
193        if mode.starts_with("lines:") {
194            fresh = true;
195        }
196
197        if crate::core::binary_detect::is_binary_file(path) {
198            let msg = crate::core::binary_detect::binary_file_message(path);
199            return Err(ErrorData::invalid_params(msg, None));
200        }
201        {
202            let cap = crate::core::limits::max_read_bytes() as u64;
203            if let Ok(meta) = std::fs::metadata(path) {
204                if meta.len() > cap {
205                    let msg = format!(
206                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
207                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
208                        meta.len(),
209                        cap
210                    );
211                    return Err(ErrorData::invalid_params(msg, None));
212                }
213            }
214        }
215
216        // Compaction-aware: if host compacted since last check, reset delivery flags
217        // so post-compaction reads deliver full content instead of stubs.
218        if !fresh {
219            if let Ok(data_dir) = crate::core::data_dir::lean_ctx_data_dir() {
220                if let Ok(mut cache) = cache_lock.try_write() {
221                    crate::server::compaction_sync::sync_if_compacted(&mut cache, &data_dir);
222                }
223            }
224        }
225
226        // Fast path: if both per-file lock and cache write-lock are immediately
227        // available, execute inline without spawning a thread. This avoids thread +
228        // channel overhead for the ~90% of calls that are cache hits.
229        let read_timeout = std::time::Duration::from_secs(30);
230        let cancelled = Arc::new(AtomicBool::new(false));
231        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
232            let crp_mode = ctx.crp_mode;
233            let task_ref = current_task.as_deref();
234
235            let fast_result = 'fast: {
236                let file_lock = per_file_lock(path);
237                let Some(_file_guard) = file_lock.try_lock().ok() else {
238                    break 'fast None;
239                };
240
241                // Phase 1 (shared lock): the dominant case is re-reading an
242                // unchanged file in full mode. Serve that stub under a *read*
243                // lock so parallel reads of distinct files run concurrently
244                // instead of serializing on the global write lock.
245                if !fresh && mode == "full" {
246                    if let Ok(cache) = cache_lock.try_read() {
247                        if let Some(read_output) =
248                            crate::tools::ctx_read::try_stub_hit_readonly(&cache, path)
249                        {
250                            let content = read_output.content;
251                            let rmode = read_output.resolved_mode;
252                            let orig = cache.get(path).map_or(0, |e| e.original_tokens);
253                            let hit = content.contains(" cached ")
254                                || content.contains("[unchanged")
255                                || content.contains("[delta:");
256                            let fref = cache.file_ref_map().get(path).cloned();
257                            let stats = cache.get_stats();
258                            let stats_snapshot = (stats.total_reads(), stats.cache_hits());
259                            break 'fast Some((content, rmode, orig, hit, fref, stats_snapshot));
260                        }
261                    }
262                }
263
264                // Phase 2 (write lock): cache miss, changed file, or non-stub
265                // modes (map/signatures/diff/lines) that mutate cache state.
266                let Some(mut cache) = cache_lock.try_write().ok() else {
267                    break 'fast None;
268                };
269                let read_output = if fresh {
270                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
271                        &mut cache, path, &mode, crp_mode, task_ref,
272                    )
273                } else {
274                    crate::tools::ctx_read::handle_with_task_resolved(
275                        &mut cache, path, &mode, crp_mode, task_ref,
276                    )
277                };
278                let content = read_output.content;
279                let rmode = read_output.resolved_mode;
280                let orig = cache.get(path).map_or(0, |e| e.original_tokens);
281                let hit = content.contains(" cached ")
282                    || content.contains("[unchanged")
283                    || content.contains("[delta:");
284                let fref = cache.file_ref_map().get(path).cloned();
285                let stats = cache.get_stats();
286                let stats_snapshot = (stats.total_reads(), stats.cache_hits());
287                Some((content, rmode, orig, hit, fref, stats_snapshot))
288            };
289
290            if let Some(result) = fast_result {
291                result
292            } else {
293                // Slow path: spawn thread with bounded timeout for contended locks.
294                let cache_lock = cache_lock.clone();
295                let mode = mode.clone();
296                let task_owned = current_task.clone();
297                let path_owned = path.to_string();
298                let cancel_flag = cancelled.clone();
299                let (tx, rx) = std::sync::mpsc::sync_channel(1);
300                std::thread::spawn(move || {
301                    let file_lock = per_file_lock(&path_owned);
302
303                    // Bounded per-file lock: if a zombie thread still holds it, don't
304                    // wait forever. 25s keeps us inside the 30s recv_timeout.
305                    let _file_guard = {
306                        let deadline =
307                            std::time::Instant::now() + std::time::Duration::from_secs(25);
308                        loop {
309                            if cancel_flag.load(Ordering::Relaxed) {
310                                return;
311                            }
312                            if let Ok(guard) = file_lock.try_lock() {
313                                break guard;
314                            }
315                            if std::time::Instant::now() >= deadline {
316                                tracing::error!(
317                                    "ctx_read: per-file lock timeout after 25s for {path_owned}"
318                                );
319                                let _ = tx.send((
320                                    format!("per-file lock contention for {path_owned} — retry in a moment"),
321                                    "error".to_string(), 0, false, None, (0, 0),
322                                ));
323                                return;
324                            }
325                            std::thread::sleep(std::time::Duration::from_millis(50));
326                        }
327                    };
328
329                    if cancel_flag.load(Ordering::Relaxed) {
330                        return;
331                    }
332
333                    // Bounded cache write-lock: avoids indefinite block when a zombie
334                    // thread from a previous timed-out call still holds the lock.
335                    let mut cache = {
336                        let deadline =
337                            std::time::Instant::now() + std::time::Duration::from_secs(25);
338                        loop {
339                            if cancel_flag.load(Ordering::Relaxed) {
340                                return;
341                            }
342                            if let Ok(guard) = cache_lock.try_write() {
343                                break guard;
344                            }
345                            if std::time::Instant::now() >= deadline {
346                                tracing::error!(
347                                    "ctx_read: cache write-lock timeout after 25s for {path_owned}"
348                                );
349                                let _ = tx.send((
350                                    format!("cache lock contention for {path_owned} — retry in a moment"),
351                                    "error".to_string(), 0, false, None, (0, 0),
352                                ));
353                                return;
354                            }
355                            std::thread::sleep(std::time::Duration::from_millis(50));
356                        }
357                    };
358
359                    let task_ref = task_owned.as_deref();
360                    let read_output = if fresh {
361                        crate::tools::ctx_read::handle_fresh_with_task_resolved(
362                            &mut cache,
363                            &path_owned,
364                            &mode,
365                            crp_mode,
366                            task_ref,
367                        )
368                    } else {
369                        crate::tools::ctx_read::handle_with_task_resolved(
370                            &mut cache,
371                            &path_owned,
372                            &mode,
373                            crp_mode,
374                            task_ref,
375                        )
376                    };
377                    let content = read_output.content;
378                    let rmode = read_output.resolved_mode;
379                    let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
380                    let hit = content.contains(" cached ");
381                    let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
382                    let stats = cache.get_stats();
383                    let stats_snapshot = (stats.total_reads(), stats.cache_hits());
384                    let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
385                });
386                if let Ok(result) = rx.recv_timeout(read_timeout) {
387                    result
388                } else {
389                    cancelled.store(true, Ordering::Relaxed);
390                    tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
391                    let msg = format!(
392                        "ERROR: ctx_read timed out after {}s reading {path}. \
393                     The file may be very large or a blocking I/O issue occurred. \
394                     Try mode=\"lines:1-100\" for a partial read.",
395                        read_timeout.as_secs()
396                    );
397                    return Err(ErrorData::internal_error(msg, None));
398                }
399            } // end else (slow path)
400        };
401
402        // Convert error results to proper MCP ErrorData instead of success body
403        if resolved_mode == "error" {
404            return Err(ErrorData::invalid_params(output, None));
405        }
406
407        let output_tokens = crate::core::tokens::count_tokens(&output);
408        let saved = original.saturating_sub(output_tokens);
409
410        // Session updates (bounded lock — 10s timeout, read already succeeded)
411        let mut ensured_root: Option<String> = None;
412        let project_root_snapshot;
413        {
414            let rt = tokio::runtime::Handle::current();
415            let session_guard = rt.block_on(tokio::time::timeout(
416                std::time::Duration::from_secs(10),
417                session_lock.write(),
418            ));
419            if let Ok(mut session) = session_guard {
420                session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
421                // Auto-generate file summary from output content
422                let file_summary = extract_file_summary(&output, path);
423                if !file_summary.is_empty() {
424                    session.set_file_summary(path, &file_summary);
425                }
426                if is_cache_hit {
427                    session.record_cache_hit();
428                }
429                if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
430                    let touched: Vec<String> = session
431                        .files_touched
432                        .iter()
433                        .map(|f| f.path.clone())
434                        .collect();
435                    let inferred =
436                        crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
437                    if inferred.confidence >= 0.4 {
438                        session.active_structured_intent = Some(inferred);
439                    }
440                }
441                // Auto-infer task every 5th file read if not explicitly set
442                if session.task.is_none() && session.stats.files_read % 5 == 0 {
443                    session.auto_infer_task();
444                }
445                let root_missing = session
446                    .project_root
447                    .as_deref()
448                    .is_none_or(|r| r.trim().is_empty());
449                if root_missing {
450                    if let Some(root) = crate::core::protocol::detect_project_root(path) {
451                        session.project_root = Some(root.clone());
452                        ensured_root = Some(root);
453                    }
454                }
455                project_root_snapshot = session
456                    .project_root
457                    .clone()
458                    .unwrap_or_else(|| ".".to_string());
459            } else {
460                tracing::warn!(
461                    "session write-lock timeout (5s) in ctx_read post-update for {path}"
462                );
463                project_root_snapshot = ctx.project_root.clone();
464            }
465        }
466
467        if let Some(root) = ensured_root.as_deref() {
468            crate::core::index_orchestrator::ensure_all_background(root);
469        }
470
471        // Telemetry + learning are pure side-effects that never influence this
472        // response, yet they did synchronous disk I/O on every read (heatmap
473        // append, ModePredictor load+save, FeedbackStore load). Push them off
474        // the hot path so reads — especially cache-hit stubs — return without
475        // waiting on disk (#149).
476        {
477            let path_bg = path.to_string();
478            let resolved_mode_bg = resolved_mode.clone();
479            let project_root_bg = project_root_snapshot.clone();
480            let (turns, hits) = cache_stats;
481            std::thread::spawn(move || {
482                crate::core::heatmap::record_file_access(&path_bg, original, saved);
483
484                let sig = crate::core::mode_predictor::FileSignature::from_path(&path_bg, original);
485                let density = if output_tokens > 0 {
486                    original as f64 / output_tokens as f64
487                } else {
488                    1.0
489                };
490                let outcome = crate::core::mode_predictor::ModeOutcome {
491                    mode: resolved_mode_bg,
492                    tokens_in: original,
493                    tokens_out: output_tokens,
494                    density: density.min(1.0),
495                };
496                let mut predictor = crate::core::mode_predictor::ModePredictor::new();
497                predictor.set_project_root(&project_root_bg);
498                predictor.record(sig, outcome);
499                predictor.save();
500
501                let ext = std::path::Path::new(&path_bg)
502                    .extension()
503                    .and_then(|e| e.to_str())
504                    .unwrap_or("")
505                    .to_string();
506                let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(&path_bg);
507                let feedback_outcome = crate::core::feedback::CompressionOutcome {
508                    session_id: format!("{}", std::process::id()),
509                    language: ext,
510                    entropy_threshold: thresholds.bpe_entropy,
511                    jaccard_threshold: thresholds.jaccard,
512                    total_turns: turns as u32,
513                    tokens_saved: saved as u64,
514                    tokens_original: original as u64,
515                    cache_hits: hits as u32,
516                    total_reads: turns as u32,
517                    task_completed: true,
518                    timestamp: chrono::Local::now().to_rfc3339(),
519                };
520                let mut store = crate::core::feedback::FeedbackStore::load();
521                store.project_root = Some(project_root_bg);
522                store.record_outcome(feedback_outcome);
523            });
524        }
525
526        if let Some(aid) = resolved_agent_id.as_deref() {
527            crate::core::agent_budget::record_consumption(aid, output_tokens);
528        }
529
530        // Cross-source hints: if a graph index exists and has cross-source edges
531        // pointing to this file, append compact hints so the agent knows about
532        // related issues/PRs/schemas without a separate tool call.
533        let hints_suffix = {
534            if let Some(index) = crate::core::graph_index::ProjectIndex::load(&ctx.project_root) {
535                let hints = crate::core::cross_source_hints::hints_for_file(
536                    path,
537                    &index.edges,
538                    &ctx.project_root,
539                );
540                if hints.is_empty() {
541                    String::new()
542                } else {
543                    crate::core::cross_source_hints::format_hints(&hints)
544                }
545            } else {
546                String::new()
547            }
548        };
549
550        let mut warnings = Vec::new();
551        if let Some(ref w) = budget_warning {
552            warnings.push(w.as_str());
553        }
554        if let Some(ref w) = degrade_warning {
555            warnings.push(w.as_str());
556        }
557        let final_output = if !warnings.is_empty() {
558            format!("{output}{hints_suffix}\n\n{}", warnings.join("\n"))
559        } else if hints_suffix.is_empty() {
560            output
561        } else {
562            format!("{output}{hints_suffix}")
563        };
564
565        Ok(ToolOutput {
566            text: final_output,
567            original_tokens: original,
568            saved_tokens: saved,
569            mode: Some(resolved_mode),
570            path: Some(path.to_string()),
571            changed: false,
572        })
573    }
574}
575
576fn apply_verdict(
577    mode: &str,
578    verdict: crate::core::degradation_policy::DegradationVerdictV1,
579) -> (String, bool) {
580    use crate::core::degradation_policy::DegradationVerdictV1;
581    match verdict {
582        DegradationVerdictV1::Ok => (mode.to_string(), false),
583        DegradationVerdictV1::Warn => match mode {
584            "full" => ("map".to_string(), true),
585            other => (other.to_string(), false),
586        },
587        DegradationVerdictV1::Throttle => match mode {
588            "full" | "map" => ("signatures".to_string(), true),
589            other => (other.to_string(), false),
590        },
591        DegradationVerdictV1::Block => {
592            if mode == "signatures" {
593                ("signatures".to_string(), false)
594            } else {
595                ("signatures".to_string(), true)
596            }
597        }
598    }
599}
600
601fn auto_degrade_read_mode(mode: &str) -> (String, Option<String>) {
602    if crate::core::config::Config::load().no_degrade_effective() {
603        return (mode.to_string(), None);
604    }
605    let profile = crate::core::profiles::active_profile();
606    if !profile.degradation.enforce_effective() {
607        return (mode.to_string(), None);
608    }
609    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
610    let (new_mode, degraded) = apply_verdict(mode, policy.decision.verdict);
611    let warning = if degraded {
612        Some(format!(
613            "⚠ Context pressure: mode={mode} was downgraded to mode={new_mode} \
614             (verdict: {:?}). Use start_line=1 to bypass, or run ctx_compress to free budget.",
615            policy.decision.verdict
616        ))
617    } else {
618        None
619    };
620    (new_mode, warning)
621}
622
623fn extract_file_summary(output: &str, path: &str) -> String {
624    let hint = crate::core::auto_findings::extract_content_hint(output);
625    if !hint.is_empty() {
626        return hint;
627    }
628    let ext = std::path::Path::new(path)
629        .extension()
630        .and_then(|e| e.to_str())
631        .unwrap_or("");
632    let line_count = output.lines().count();
633    if line_count > 5 {
634        format!("{ext} file, {line_count} lines")
635    } else {
636        String::new()
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643    use std::sync::atomic::{AtomicUsize, Ordering};
644
645    #[test]
646    fn per_file_lock_same_path_returns_same_mutex() {
647        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
648        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
649        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
650    }
651
652    #[test]
653    fn per_file_lock_different_paths_return_different_mutexes() {
654        let lock_a = per_file_lock("/tmp/test_path_a.txt");
655        let lock_b = per_file_lock("/tmp/test_path_b.txt");
656        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
657    }
658
659    #[test]
660    fn per_file_lock_serializes_concurrent_access() {
661        let counter = Arc::new(AtomicUsize::new(0));
662        let max_concurrent = Arc::new(AtomicUsize::new(0));
663        let path = "/tmp/test_concurrent_serialization.txt";
664        let mut handles = Vec::new();
665
666        for _ in 0..5 {
667            let counter = counter.clone();
668            let max_concurrent = max_concurrent.clone();
669            let path = path.to_string();
670            handles.push(std::thread::spawn(move || {
671                let lock = per_file_lock(&path);
672                let _guard = lock.lock().unwrap();
673                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
674                max_concurrent.fetch_max(active, Ordering::SeqCst);
675                std::thread::sleep(std::time::Duration::from_millis(10));
676                counter.fetch_sub(1, Ordering::SeqCst);
677            }));
678        }
679
680        for h in handles {
681            h.join().unwrap();
682        }
683
684        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
685    }
686
687    #[test]
688    fn per_file_lock_allows_parallel_different_paths() {
689        let counter = Arc::new(AtomicUsize::new(0));
690        let max_concurrent = Arc::new(AtomicUsize::new(0));
691        let mut handles = Vec::new();
692
693        for i in 0..4 {
694            let counter = counter.clone();
695            let max_concurrent = max_concurrent.clone();
696            let path = format!("/tmp/test_parallel_{i}.txt");
697            handles.push(std::thread::spawn(move || {
698                let lock = per_file_lock(&path);
699                let _guard = lock.lock().unwrap();
700                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
701                max_concurrent.fetch_max(active, Ordering::SeqCst);
702                std::thread::sleep(std::time::Duration::from_millis(50));
703                counter.fetch_sub(1, Ordering::SeqCst);
704            }));
705        }
706
707        for h in handles {
708            h.join().unwrap();
709        }
710
711        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
712    }
713
714    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
715    /// must not block subsequent reads indefinitely. The try_write() loop inside
716    /// the spawned thread should respect its 25s deadline and the cancellation flag.
717    #[test]
718    fn zombie_thread_does_not_block_subsequent_cache_access() {
719        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
720
721        // Simulate a zombie: hold the write-lock on a background thread for 2s.
722        let zombie_lock = cache.clone();
723        let _zombie = std::thread::spawn(move || {
724            let _guard = zombie_lock.blocking_write();
725            std::thread::sleep(std::time::Duration::from_secs(2));
726        });
727        std::thread::sleep(std::time::Duration::from_millis(50));
728
729        // A try_read() must fail immediately (zombie holds write-lock).
730        assert!(cache.try_read().is_err());
731
732        // A try_write() loop with cancellation must exit promptly.
733        let cancel = Arc::new(AtomicBool::new(false));
734        let cancel2 = cancel.clone();
735        let lock2 = cache.clone();
736        let waiter = std::thread::spawn(move || {
737            let start = std::time::Instant::now();
738            loop {
739                if cancel2.load(Ordering::Relaxed) {
740                    return (false, start.elapsed());
741                }
742                if let Ok(_guard) = lock2.try_write() {
743                    return (true, start.elapsed());
744                }
745                std::thread::sleep(std::time::Duration::from_millis(50));
746            }
747        });
748
749        // Set cancellation after 200ms — the loop should exit quickly.
750        std::thread::sleep(std::time::Duration::from_millis(200));
751        cancel.store(true, Ordering::Relaxed);
752
753        let (acquired, elapsed) = waiter.join().unwrap();
754        assert!(
755            !acquired,
756            "should not have acquired lock while zombie holds it"
757        );
758        assert!(
759            elapsed < std::time::Duration::from_secs(1),
760            "cancellation should have stopped the loop promptly"
761        );
762    }
763
764    // -- Regression: GitHub Issue #253 + #259 --
765    // Helper that mirrors the runtime start_line logic.
766    fn apply_start_line(
767        mode: &mut String,
768        fresh: &mut bool,
769        explicit_mode: bool,
770        start_line: Option<i64>,
771    ) {
772        if let Some(sl) = start_line {
773            let sl = sl.max(1_i64);
774            if sl <= 1 {
775                return;
776            }
777            *fresh = true;
778            if !explicit_mode || mode.starts_with("lines") {
779                *mode = format!("lines:{sl}-999999");
780            }
781        }
782    }
783
784    #[test]
785    fn start_line_1_does_not_override_mode() {
786        let mut mode = "auto".to_string();
787        let mut fresh = false;
788        apply_start_line(&mut mode, &mut fresh, false, Some(1));
789        assert_eq!(mode, "auto", "start_line=1 should not change mode");
790        assert!(!fresh, "start_line=1 should not force fresh=true");
791    }
792
793    #[test]
794    fn start_line_gt1_overrides_implicit_mode() {
795        let mut mode = "auto".to_string();
796        let mut fresh = false;
797        apply_start_line(&mut mode, &mut fresh, false, Some(50));
798        assert_eq!(mode, "lines:50-999999");
799        assert!(fresh);
800    }
801
802    #[test]
803    fn start_line_gt1_does_not_override_explicit_map() {
804        // GitHub #259: mode=map + start_line=50 → mode stays map
805        let mut mode = "map".to_string();
806        let mut fresh = false;
807        apply_start_line(&mut mode, &mut fresh, true, Some(50));
808        assert_eq!(
809            mode, "map",
810            "explicit mode=map must not be clobbered by start_line"
811        );
812        assert!(fresh, "start_line>1 should still force fresh");
813    }
814
815    #[test]
816    fn start_line_gt1_does_not_override_explicit_signatures() {
817        let mut mode = "signatures".to_string();
818        let mut fresh = false;
819        apply_start_line(&mut mode, &mut fresh, true, Some(100));
820        assert_eq!(mode, "signatures");
821        assert!(fresh);
822    }
823
824    #[test]
825    fn start_line_gt1_honors_explicit_lines_mode() {
826        let mut mode = "lines:1-50".to_string();
827        let mut fresh = false;
828        apply_start_line(&mut mode, &mut fresh, true, Some(30));
829        assert_eq!(
830            mode, "lines:30-999999",
831            "explicit lines mode should accept start_line override"
832        );
833        assert!(fresh);
834    }
835
836    #[test]
837    fn start_line_none_does_nothing() {
838        let mut mode = "map".to_string();
839        let mut fresh = false;
840        apply_start_line(&mut mode, &mut fresh, true, None);
841        assert_eq!(mode, "map");
842        assert!(!fresh);
843    }
844
845    #[test]
846    fn start_line_1_with_explicit_mode_preserves_it() {
847        // OpenCode sends start_line=1 + mode=map — both should be preserved
848        let mut mode = "map".to_string();
849        let mut fresh = false;
850        apply_start_line(&mut mode, &mut fresh, true, Some(1));
851        assert_eq!(mode, "map");
852        assert!(!fresh);
853    }
854
855    // -- Regression: GitHub Issue #262 --
856    // auto_degrade_read_mode must produce a warning when mode is downgraded.
857
858    use crate::core::degradation_policy::DegradationVerdictV1;
859
860    #[test]
861    fn verdict_ok_does_not_degrade() {
862        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Ok);
863        assert_eq!(mode, "full");
864        assert!(!degraded);
865    }
866
867    #[test]
868    fn verdict_warn_degrades_full_to_map() {
869        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
870        assert_eq!(mode, "map");
871        assert!(degraded, "full→map must be flagged as degraded");
872    }
873
874    #[test]
875    fn verdict_warn_keeps_map() {
876        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Warn);
877        assert_eq!(mode, "map");
878        assert!(!degraded, "map is not degraded under Warn");
879    }
880
881    #[test]
882    fn verdict_warn_keeps_signatures() {
883        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Warn);
884        assert_eq!(mode, "signatures");
885        assert!(!degraded);
886    }
887
888    #[test]
889    fn verdict_throttle_degrades_full_to_signatures() {
890        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Throttle);
891        assert_eq!(mode, "signatures");
892        assert!(degraded);
893    }
894
895    #[test]
896    fn verdict_throttle_degrades_map_to_signatures() {
897        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Throttle);
898        assert_eq!(mode, "signatures");
899        assert!(degraded);
900    }
901
902    #[test]
903    fn verdict_throttle_keeps_lines() {
904        let (mode, degraded) = super::apply_verdict("lines:1-50", DegradationVerdictV1::Throttle);
905        assert_eq!(mode, "lines:1-50");
906        assert!(!degraded, "lines mode bypasses degradation");
907    }
908
909    #[test]
910    fn verdict_block_degrades_full_to_signatures() {
911        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Block);
912        assert_eq!(mode, "signatures");
913        assert!(degraded);
914    }
915
916    #[test]
917    fn verdict_block_does_not_degrade_signatures() {
918        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Block);
919        assert_eq!(mode, "signatures");
920        assert!(!degraded, "already at signatures — no degradation needed");
921    }
922
923    #[test]
924    fn degrade_warning_message_contains_mode_info() {
925        let (new_mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
926        assert!(degraded);
927        let warning = format!(
928            "⚠ Context pressure: mode=full was downgraded to mode={new_mode} (verdict: {:?}).",
929            DegradationVerdictV1::Warn
930        );
931        assert!(warning.contains("mode=full"));
932        assert!(warning.contains("mode=map"));
933        assert!(warning.contains("Warn"));
934    }
935
936    // --- auto_degrade_read_mode: no_degrade integration ---
937    // With default config (no LCTX_NO_DEGRADE), the profile's degradation.enforce
938    // is also off by default, so auto_degrade_read_mode returns mode unchanged.
939
940    #[test]
941    fn auto_degrade_preserves_full_when_default_config() {
942        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
943            return;
944        }
945        let (mode, warning) = super::auto_degrade_read_mode("full");
946        assert_eq!(mode, "full");
947        assert!(warning.is_none());
948    }
949
950    #[test]
951    fn auto_degrade_preserves_map_when_default_config() {
952        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
953            return;
954        }
955        let (mode, warning) = super::auto_degrade_read_mode("map");
956        assert_eq!(mode, "map");
957        assert!(warning.is_none());
958    }
959
960    #[test]
961    fn auto_degrade_preserves_signatures_when_default_config() {
962        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
963            return;
964        }
965        let (mode, warning) = super::auto_degrade_read_mode("signatures");
966        assert_eq!(mode, "signatures");
967        assert!(warning.is_none());
968    }
969
970    #[test]
971    fn auto_degrade_preserves_diff_always() {
972        let (mode, warning) = super::auto_degrade_read_mode("diff");
973        assert_eq!(mode, "diff");
974        assert!(warning.is_none());
975    }
976
977    #[test]
978    fn auto_degrade_preserves_lines_mode_always() {
979        let (mode, warning) = super::auto_degrade_read_mode("lines:10-50");
980        assert_eq!(mode, "lines:10-50");
981        assert!(warning.is_none());
982    }
983
984    #[test]
985    fn auto_degrade_preserves_aggressive_when_default_config() {
986        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
987            return;
988        }
989        let (mode, warning) = super::auto_degrade_read_mode("aggressive");
990        assert_eq!(mode, "aggressive");
991        assert!(warning.is_none());
992    }
993
994    #[test]
995    fn auto_degrade_preserves_entropy_when_default_config() {
996        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
997            return;
998        }
999        let (mode, warning) = super::auto_degrade_read_mode("entropy");
1000        assert_eq!(mode, "entropy");
1001        assert!(warning.is_none());
1002    }
1003
1004    #[test]
1005    fn auto_degrade_preserves_auto_when_default_config() {
1006        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
1007            return;
1008        }
1009        let (mode, warning) = super::auto_degrade_read_mode("auto");
1010        assert_eq!(mode, "auto");
1011        assert!(warning.is_none());
1012    }
1013
1014    // --- apply_verdict: exhaustive mode × verdict matrix ---
1015
1016    #[test]
1017    fn verdict_warn_does_not_degrade_diff() {
1018        let (mode, degraded) = super::apply_verdict("diff", DegradationVerdictV1::Warn);
1019        assert_eq!(mode, "diff");
1020        assert!(!degraded);
1021    }
1022
1023    #[test]
1024    fn verdict_throttle_does_not_degrade_signatures() {
1025        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Throttle);
1026        assert_eq!(mode, "signatures");
1027        assert!(!degraded);
1028    }
1029
1030    #[test]
1031    fn verdict_ok_preserves_map() {
1032        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Ok);
1033        assert_eq!(mode, "map");
1034        assert!(!degraded);
1035    }
1036
1037    #[test]
1038    fn verdict_ok_preserves_signatures() {
1039        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Ok);
1040        assert_eq!(mode, "signatures");
1041        assert!(!degraded);
1042    }
1043
1044    #[test]
1045    fn verdict_ok_preserves_lines() {
1046        let (mode, degraded) = super::apply_verdict("lines:1-100", DegradationVerdictV1::Ok);
1047        assert_eq!(mode, "lines:1-100");
1048        assert!(!degraded);
1049    }
1050
1051    #[test]
1052    fn verdict_block_degrades_map_to_signatures() {
1053        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Block);
1054        assert_eq!(mode, "signatures");
1055        assert!(degraded);
1056    }
1057}