Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{
10    get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
11};
12use crate::tool_defs::tool_def;
13
14/// Per-file lock that serializes concurrent reads of the same path.
15///
16/// When multiple subagents read sequentially through a shared set of files,
17/// they tend to hit the same path at the same time. Without per-file locking
18/// they all contend on the global cache write lock while doing redundant I/O.
19/// This lock ensures only one thread reads a given file from disk; the others
20/// wait cheaply on the per-file mutex, then hit the warm cache.
21fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
22    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
23        std::sync::OnceLock::new();
24    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
25    let mut map = map.lock().unwrap_or_else(|poisoned| {
26        tracing::warn!("per_file_lock map poisoned; recovering");
27        poisoned.into_inner()
28    });
29
30    const MAX_ENTRIES: usize = 500;
31    if map.len() > MAX_ENTRIES {
32        map.retain(|_, v| Arc::strong_count(v) > 1);
33    }
34
35    map.entry(path.to_string())
36        .or_insert_with(|| Arc::new(Mutex::new(())))
37        .clone()
38}
39
40pub struct CtxReadTool;
41
42impl McpTool for CtxReadTool {
43    fn name(&self) -> &'static str {
44        "ctx_read"
45    }
46
47    fn tool_def(&self) -> Tool {
48        tool_def(
49            "ctx_read",
50            "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
51Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
52            json!({
53                "type": "object",
54                "properties": {
55                    "path": { "type": "string", "description": "Absolute file path to read" },
56                    "mode": {
57                        "type": "string",
58                        "description": "Compression mode (default: auto — resolved per file type/size). Explicit 'full' for guaranteed complete content. Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
59                    },
60                    "start_line": {
61                        "type": "integer",
62                        "description": "Start reading from this line (only used when no explicit mode is set, or with mode=lines). Does NOT override explicit modes like map/signatures."
63                    },
64                    "fresh": {
65                        "type": "boolean",
66                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
67                    }
68                },
69                "required": ["path"]
70            }),
71        )
72    }
73
74    fn handle(
75        &self,
76        args: &Map<String, Value>,
77        ctx: &ToolContext,
78    ) -> Result<ToolOutput, ErrorData> {
79        let path = require_resolved_path(ctx, args, "path")?;
80
81        match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
82            self.handle_inner(args, ctx, &path)
83        })) {
84            Ok(result) => result,
85            Err(_) => Err(ErrorData::internal_error(
86                format!("ctx_read panicked while processing '{path}'. This is a bug — please report it."),
87                None,
88            )),
89        }
90    }
91}
92
93impl CtxReadTool {
94    #[allow(clippy::unused_self)]
95    fn handle_inner(
96        &self,
97        args: &Map<String, Value>,
98        ctx: &ToolContext,
99        path: &str,
100    ) -> Result<ToolOutput, ErrorData> {
101        let session_lock = ctx
102            .session
103            .as_ref()
104            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
105        let cache_lock = ctx
106            .cache
107            .as_ref()
108            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
109
110        let current_task = {
111            let rt = tokio::runtime::Handle::current();
112            let mut attempt = 0u32;
113            loop {
114                if let Ok(session) = rt.block_on(tokio::time::timeout(
115                    std::time::Duration::from_secs(5),
116                    session_lock.read(),
117                )) {
118                    break session.task.as_ref().map(|t| t.description.clone());
119                }
120                attempt += 1;
121                if attempt >= 3 {
122                    tracing::warn!(
123                        "session read-lock timeout after {attempt} attempts in ctx_read for {path}"
124                    );
125                    return Err(ErrorData::internal_error(
126                        "session lock timeout — another tool may be holding it. Retry in a moment.",
127                        None,
128                    ));
129                }
130                tracing::debug!(
131                    "session read-lock attempt {attempt}/3 timed out for {path}, retrying"
132                );
133                std::thread::sleep(std::time::Duration::from_millis(100 * u64::from(attempt)));
134            }
135        };
136        let task_ref = current_task.as_deref();
137
138        let profile = crate::core::profiles::active_profile();
139        let explicit_mode_arg = get_str(args, "mode");
140        let explicit_mode = explicit_mode_arg.is_some();
141        let mut mode = if let Some(m) = explicit_mode_arg {
142            m
143        } else if profile.read.default_mode_effective() == "auto" {
144            if let Ok(cache) = cache_lock.try_read() {
145                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
146            } else {
147                tracing::debug!(
148                    "cache lock contested during auto-mode selection for {path}; \
149                     falling back to full"
150                );
151                "full".to_string()
152            }
153        } else {
154            profile.read.default_mode_effective().to_string()
155        };
156        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
157        let cache_policy = crate::server::compaction_sync::effective_cache_policy();
158        if cache_policy == "off" {
159            fresh = true;
160        }
161        let start_line = get_int(args, "start_line");
162        if let Some(sl) = start_line {
163            let sl = sl.max(1_i64);
164            if sl > 1 {
165                fresh = true;
166                // Only override mode when no explicit mode was requested,
167                // or when the explicit mode is already a lines range.
168                // If the caller explicitly set mode=map/signatures/etc.,
169                // start_line must not clobber it (GitHub #259).
170                if !explicit_mode || mode.starts_with("lines") {
171                    mode = format!("lines:{sl}-999999");
172                }
173            }
174        }
175
176        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
177        let resolved_agent_id = ctx.agent_id.as_ref().and_then(|a| match a.try_read() {
178            Ok(guard) => guard.clone(),
179            Err(_) => None,
180        });
181        let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
182            path,
183            &mode,
184            task_ref,
185            Some(&ctx.project_root),
186            pressure_action,
187            resolved_agent_id.as_deref(),
188        );
189        if gate_result.budget_blocked {
190            let msg = gate_result
191                .budget_warning
192                .unwrap_or_else(|| "Agent token budget exceeded".to_string());
193            return Err(ErrorData::invalid_params(msg, None));
194        }
195        let budget_warning = gate_result.budget_warning.clone();
196        if let Some(overridden) = gate_result.overridden_mode {
197            mode = overridden;
198        }
199
200        let (mode, degrade_warning) = if crate::tools::ctx_read::is_instruction_file(path) {
201            ("full".to_string(), None)
202        } else {
203            auto_degrade_read_mode(&mode)
204        };
205
206        if mode.starts_with("lines:") {
207            fresh = true;
208        }
209
210        if crate::core::binary_detect::is_binary_file(path) {
211            let msg = crate::core::binary_detect::binary_file_message(path);
212            return Err(ErrorData::invalid_params(msg, None));
213        }
214        {
215            let cap = crate::core::limits::max_read_bytes() as u64;
216            if let Ok(meta) = std::fs::metadata(path) {
217                if meta.len() > cap {
218                    let msg = format!(
219                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
220                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
221                        meta.len(),
222                        cap
223                    );
224                    return Err(ErrorData::invalid_params(msg, None));
225                }
226            }
227        }
228
229        // Compaction-aware: if host compacted since last check, reset delivery flags
230        // so post-compaction reads deliver full content instead of stubs.
231        if !fresh {
232            if let Ok(data_dir) = crate::core::data_dir::lean_ctx_data_dir() {
233                if let Ok(mut cache) = cache_lock.try_write() {
234                    crate::server::compaction_sync::sync_if_compacted(&mut cache, &data_dir);
235                }
236            }
237        }
238
239        // Fast path: if both per-file lock and cache write-lock are immediately
240        // available, execute inline without spawning a thread. This avoids thread +
241        // channel overhead for the ~90% of calls that are cache hits.
242        let read_timeout = std::time::Duration::from_secs(30);
243        let cancelled = Arc::new(AtomicBool::new(false));
244        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
245            let crp_mode = ctx.crp_mode;
246            let task_ref = current_task.as_deref();
247
248            let fast_result = 'fast: {
249                let file_lock = per_file_lock(path);
250                let Some(_file_guard) = file_lock.try_lock().ok() else {
251                    break 'fast None;
252                };
253                let Some(mut cache) = cache_lock.try_write().ok() else {
254                    break 'fast None;
255                };
256                let read_output = if fresh {
257                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
258                        &mut cache, path, &mode, crp_mode, task_ref,
259                    )
260                } else {
261                    crate::tools::ctx_read::handle_with_task_resolved(
262                        &mut cache, path, &mode, crp_mode, task_ref,
263                    )
264                };
265                let content = read_output.content;
266                let rmode = read_output.resolved_mode;
267                let orig = cache.get(path).map_or(0, |e| e.original_tokens);
268                let hit = content.contains(" cached ")
269                    || content.contains("[unchanged")
270                    || content.contains("[delta:");
271                let fref = cache.file_ref_map().get(path).cloned();
272                let stats = cache.get_stats();
273                let stats_snapshot = (stats.total_reads, stats.cache_hits);
274                Some((content, rmode, orig, hit, fref, stats_snapshot))
275            };
276
277            if let Some(result) = fast_result {
278                result
279            } else {
280                // Slow path: spawn thread with bounded timeout for contended locks.
281                let cache_lock = cache_lock.clone();
282                let mode = mode.clone();
283                let task_owned = current_task.clone();
284                let path_owned = path.to_string();
285                let cancel_flag = cancelled.clone();
286                let (tx, rx) = std::sync::mpsc::sync_channel(1);
287                std::thread::spawn(move || {
288                    let file_lock = per_file_lock(&path_owned);
289
290                    // Bounded per-file lock: if a zombie thread still holds it, don't
291                    // wait forever. 25s keeps us inside the 30s recv_timeout.
292                    let _file_guard = {
293                        let deadline =
294                            std::time::Instant::now() + std::time::Duration::from_secs(25);
295                        loop {
296                            if cancel_flag.load(Ordering::Relaxed) {
297                                return;
298                            }
299                            if let Ok(guard) = file_lock.try_lock() {
300                                break guard;
301                            }
302                            if std::time::Instant::now() >= deadline {
303                                tracing::error!(
304                                    "ctx_read: per-file lock timeout after 25s for {path_owned}"
305                                );
306                                let _ = tx.send((
307                                    format!("per-file lock contention for {path_owned} — retry in a moment"),
308                                    "error".to_string(), 0, false, None, (0, 0),
309                                ));
310                                return;
311                            }
312                            std::thread::sleep(std::time::Duration::from_millis(50));
313                        }
314                    };
315
316                    if cancel_flag.load(Ordering::Relaxed) {
317                        return;
318                    }
319
320                    // Bounded cache write-lock: avoids indefinite block when a zombie
321                    // thread from a previous timed-out call still holds the lock.
322                    let mut cache = {
323                        let deadline =
324                            std::time::Instant::now() + std::time::Duration::from_secs(25);
325                        loop {
326                            if cancel_flag.load(Ordering::Relaxed) {
327                                return;
328                            }
329                            if let Ok(guard) = cache_lock.try_write() {
330                                break guard;
331                            }
332                            if std::time::Instant::now() >= deadline {
333                                tracing::error!(
334                                    "ctx_read: cache write-lock timeout after 25s for {path_owned}"
335                                );
336                                let _ = tx.send((
337                                    format!("cache lock contention for {path_owned} — retry in a moment"),
338                                    "error".to_string(), 0, false, None, (0, 0),
339                                ));
340                                return;
341                            }
342                            std::thread::sleep(std::time::Duration::from_millis(50));
343                        }
344                    };
345
346                    let task_ref = task_owned.as_deref();
347                    let read_output = if fresh {
348                        crate::tools::ctx_read::handle_fresh_with_task_resolved(
349                            &mut cache,
350                            &path_owned,
351                            &mode,
352                            crp_mode,
353                            task_ref,
354                        )
355                    } else {
356                        crate::tools::ctx_read::handle_with_task_resolved(
357                            &mut cache,
358                            &path_owned,
359                            &mode,
360                            crp_mode,
361                            task_ref,
362                        )
363                    };
364                    let content = read_output.content;
365                    let rmode = read_output.resolved_mode;
366                    let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
367                    let hit = content.contains(" cached ");
368                    let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
369                    let stats = cache.get_stats();
370                    let stats_snapshot = (stats.total_reads, stats.cache_hits);
371                    let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
372                });
373                if let Ok(result) = rx.recv_timeout(read_timeout) {
374                    result
375                } else {
376                    cancelled.store(true, Ordering::Relaxed);
377                    tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
378                    let msg = format!(
379                        "ERROR: ctx_read timed out after {}s reading {path}. \
380                     The file may be very large or a blocking I/O issue occurred. \
381                     Try mode=\"lines:1-100\" for a partial read.",
382                        read_timeout.as_secs()
383                    );
384                    return Err(ErrorData::internal_error(msg, None));
385                }
386            } // end else (slow path)
387        };
388
389        // Convert error results to proper MCP ErrorData instead of success body
390        if resolved_mode == "error" {
391            return Err(ErrorData::invalid_params(output, None));
392        }
393
394        let output_tokens = crate::core::tokens::count_tokens(&output);
395        let saved = original.saturating_sub(output_tokens);
396
397        // Session updates (bounded lock — 10s timeout, read already succeeded)
398        let mut ensured_root: Option<String> = None;
399        let project_root_snapshot;
400        {
401            let rt = tokio::runtime::Handle::current();
402            let session_guard = rt.block_on(tokio::time::timeout(
403                std::time::Duration::from_secs(10),
404                session_lock.write(),
405            ));
406            if let Ok(mut session) = session_guard {
407                session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
408                // Auto-generate file summary from output content
409                let file_summary = extract_file_summary(&output, path);
410                if !file_summary.is_empty() {
411                    session.set_file_summary(path, &file_summary);
412                }
413                if is_cache_hit {
414                    session.record_cache_hit();
415                }
416                if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
417                    let touched: Vec<String> = session
418                        .files_touched
419                        .iter()
420                        .map(|f| f.path.clone())
421                        .collect();
422                    let inferred =
423                        crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
424                    if inferred.confidence >= 0.4 {
425                        session.active_structured_intent = Some(inferred);
426                    }
427                }
428                // Auto-infer task every 5th file read if not explicitly set
429                if session.task.is_none() && session.stats.files_read % 5 == 0 {
430                    session.auto_infer_task();
431                }
432                let root_missing = session
433                    .project_root
434                    .as_deref()
435                    .is_none_or(|r| r.trim().is_empty());
436                if root_missing {
437                    if let Some(root) = crate::core::protocol::detect_project_root(path) {
438                        session.project_root = Some(root.clone());
439                        ensured_root = Some(root);
440                    }
441                }
442                project_root_snapshot = session
443                    .project_root
444                    .clone()
445                    .unwrap_or_else(|| ".".to_string());
446            } else {
447                tracing::warn!(
448                    "session write-lock timeout (5s) in ctx_read post-update for {path}"
449                );
450                project_root_snapshot = ctx.project_root.clone();
451            }
452        }
453
454        if let Some(root) = ensured_root.as_deref() {
455            crate::core::index_orchestrator::ensure_all_background(root);
456        }
457
458        crate::core::heatmap::record_file_access(path, original, saved);
459
460        // Mode predictor + feedback — no locks needed, uses snapshots from above
461        {
462            let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
463            let density = if output_tokens > 0 {
464                original as f64 / output_tokens as f64
465            } else {
466                1.0
467            };
468            let outcome = crate::core::mode_predictor::ModeOutcome {
469                mode: resolved_mode.clone(),
470                tokens_in: original,
471                tokens_out: output_tokens,
472                density: density.min(1.0),
473            };
474            let mut predictor = crate::core::mode_predictor::ModePredictor::new();
475            predictor.set_project_root(&project_root_snapshot);
476            predictor.record(sig, outcome);
477            predictor.save();
478
479            let ext = std::path::Path::new(path)
480                .extension()
481                .and_then(|e| e.to_str())
482                .unwrap_or("")
483                .to_string();
484            let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
485            let feedback_outcome = crate::core::feedback::CompressionOutcome {
486                session_id: format!("{}", std::process::id()),
487                language: ext,
488                entropy_threshold: thresholds.bpe_entropy,
489                jaccard_threshold: thresholds.jaccard,
490                total_turns: cache_stats.0 as u32,
491                tokens_saved: saved as u64,
492                tokens_original: original as u64,
493                cache_hits: cache_stats.1 as u32,
494                total_reads: cache_stats.0 as u32,
495                task_completed: true,
496                timestamp: chrono::Local::now().to_rfc3339(),
497            };
498            let mut store = crate::core::feedback::FeedbackStore::load();
499            store.project_root = Some(project_root_snapshot.clone());
500            store.record_outcome(feedback_outcome);
501        }
502
503        if let Some(aid) = resolved_agent_id.as_deref() {
504            crate::core::agent_budget::record_consumption(aid, output_tokens);
505        }
506
507        // Cross-source hints: if a graph index exists and has cross-source edges
508        // pointing to this file, append compact hints so the agent knows about
509        // related issues/PRs/schemas without a separate tool call.
510        let hints_suffix = {
511            if let Some(index) = crate::core::graph_index::ProjectIndex::load(&ctx.project_root) {
512                let hints = crate::core::cross_source_hints::hints_for_file(
513                    path,
514                    &index.edges,
515                    &ctx.project_root,
516                );
517                if hints.is_empty() {
518                    String::new()
519                } else {
520                    crate::core::cross_source_hints::format_hints(&hints)
521                }
522            } else {
523                String::new()
524            }
525        };
526
527        let mut warnings = Vec::new();
528        if let Some(ref w) = budget_warning {
529            warnings.push(w.as_str());
530        }
531        if let Some(ref w) = degrade_warning {
532            warnings.push(w.as_str());
533        }
534        let final_output = if !warnings.is_empty() {
535            format!("{output}{hints_suffix}\n\n{}", warnings.join("\n"))
536        } else if hints_suffix.is_empty() {
537            output
538        } else {
539            format!("{output}{hints_suffix}")
540        };
541
542        Ok(ToolOutput {
543            text: final_output,
544            original_tokens: original,
545            saved_tokens: saved,
546            mode: Some(resolved_mode),
547            path: Some(path.to_string()),
548            changed: false,
549        })
550    }
551}
552
553fn apply_verdict(
554    mode: &str,
555    verdict: crate::core::degradation_policy::DegradationVerdictV1,
556) -> (String, bool) {
557    use crate::core::degradation_policy::DegradationVerdictV1;
558    match verdict {
559        DegradationVerdictV1::Ok => (mode.to_string(), false),
560        DegradationVerdictV1::Warn => match mode {
561            "full" => ("map".to_string(), true),
562            other => (other.to_string(), false),
563        },
564        DegradationVerdictV1::Throttle => match mode {
565            "full" | "map" => ("signatures".to_string(), true),
566            other => (other.to_string(), false),
567        },
568        DegradationVerdictV1::Block => {
569            if mode == "signatures" {
570                ("signatures".to_string(), false)
571            } else {
572                ("signatures".to_string(), true)
573            }
574        }
575    }
576}
577
578fn auto_degrade_read_mode(mode: &str) -> (String, Option<String>) {
579    if crate::core::config::Config::load().no_degrade_effective() {
580        return (mode.to_string(), None);
581    }
582    let profile = crate::core::profiles::active_profile();
583    if !profile.degradation.enforce_effective() {
584        return (mode.to_string(), None);
585    }
586    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
587    let (new_mode, degraded) = apply_verdict(mode, policy.decision.verdict);
588    let warning = if degraded {
589        Some(format!(
590            "⚠ Context pressure: mode={mode} was downgraded to mode={new_mode} \
591             (verdict: {:?}). Use start_line=1 to bypass, or run ctx_compress to free budget.",
592            policy.decision.verdict
593        ))
594    } else {
595        None
596    };
597    (new_mode, warning)
598}
599
600fn extract_file_summary(output: &str, path: &str) -> String {
601    let hint = crate::core::auto_findings::extract_content_hint(output);
602    if !hint.is_empty() {
603        return hint;
604    }
605    let ext = std::path::Path::new(path)
606        .extension()
607        .and_then(|e| e.to_str())
608        .unwrap_or("");
609    let line_count = output.lines().count();
610    if line_count > 5 {
611        format!("{ext} file, {line_count} lines")
612    } else {
613        String::new()
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620    use std::sync::atomic::{AtomicUsize, Ordering};
621
622    #[test]
623    fn per_file_lock_same_path_returns_same_mutex() {
624        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
625        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
626        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
627    }
628
629    #[test]
630    fn per_file_lock_different_paths_return_different_mutexes() {
631        let lock_a = per_file_lock("/tmp/test_path_a.txt");
632        let lock_b = per_file_lock("/tmp/test_path_b.txt");
633        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
634    }
635
636    #[test]
637    fn per_file_lock_serializes_concurrent_access() {
638        let counter = Arc::new(AtomicUsize::new(0));
639        let max_concurrent = Arc::new(AtomicUsize::new(0));
640        let path = "/tmp/test_concurrent_serialization.txt";
641        let mut handles = Vec::new();
642
643        for _ in 0..5 {
644            let counter = counter.clone();
645            let max_concurrent = max_concurrent.clone();
646            let path = path.to_string();
647            handles.push(std::thread::spawn(move || {
648                let lock = per_file_lock(&path);
649                let _guard = lock.lock().unwrap();
650                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
651                max_concurrent.fetch_max(active, Ordering::SeqCst);
652                std::thread::sleep(std::time::Duration::from_millis(10));
653                counter.fetch_sub(1, Ordering::SeqCst);
654            }));
655        }
656
657        for h in handles {
658            h.join().unwrap();
659        }
660
661        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
662    }
663
664    #[test]
665    fn per_file_lock_allows_parallel_different_paths() {
666        let counter = Arc::new(AtomicUsize::new(0));
667        let max_concurrent = Arc::new(AtomicUsize::new(0));
668        let mut handles = Vec::new();
669
670        for i in 0..4 {
671            let counter = counter.clone();
672            let max_concurrent = max_concurrent.clone();
673            let path = format!("/tmp/test_parallel_{i}.txt");
674            handles.push(std::thread::spawn(move || {
675                let lock = per_file_lock(&path);
676                let _guard = lock.lock().unwrap();
677                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
678                max_concurrent.fetch_max(active, Ordering::SeqCst);
679                std::thread::sleep(std::time::Duration::from_millis(50));
680                counter.fetch_sub(1, Ordering::SeqCst);
681            }));
682        }
683
684        for h in handles {
685            h.join().unwrap();
686        }
687
688        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
689    }
690
691    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
692    /// must not block subsequent reads indefinitely. The try_write() loop inside
693    /// the spawned thread should respect its 25s deadline and the cancellation flag.
694    #[test]
695    fn zombie_thread_does_not_block_subsequent_cache_access() {
696        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
697
698        // Simulate a zombie: hold the write-lock on a background thread for 2s.
699        let zombie_lock = cache.clone();
700        let _zombie = std::thread::spawn(move || {
701            let _guard = zombie_lock.blocking_write();
702            std::thread::sleep(std::time::Duration::from_secs(2));
703        });
704        std::thread::sleep(std::time::Duration::from_millis(50));
705
706        // A try_read() must fail immediately (zombie holds write-lock).
707        assert!(cache.try_read().is_err());
708
709        // A try_write() loop with cancellation must exit promptly.
710        let cancel = Arc::new(AtomicBool::new(false));
711        let cancel2 = cancel.clone();
712        let lock2 = cache.clone();
713        let waiter = std::thread::spawn(move || {
714            let start = std::time::Instant::now();
715            loop {
716                if cancel2.load(Ordering::Relaxed) {
717                    return (false, start.elapsed());
718                }
719                if let Ok(_guard) = lock2.try_write() {
720                    return (true, start.elapsed());
721                }
722                std::thread::sleep(std::time::Duration::from_millis(50));
723            }
724        });
725
726        // Set cancellation after 200ms — the loop should exit quickly.
727        std::thread::sleep(std::time::Duration::from_millis(200));
728        cancel.store(true, Ordering::Relaxed);
729
730        let (acquired, elapsed) = waiter.join().unwrap();
731        assert!(
732            !acquired,
733            "should not have acquired lock while zombie holds it"
734        );
735        assert!(
736            elapsed < std::time::Duration::from_secs(1),
737            "cancellation should have stopped the loop promptly"
738        );
739    }
740
741    // -- Regression: GitHub Issue #253 + #259 --
742    // Helper that mirrors the runtime start_line logic.
743    fn apply_start_line(
744        mode: &mut String,
745        fresh: &mut bool,
746        explicit_mode: bool,
747        start_line: Option<i64>,
748    ) {
749        if let Some(sl) = start_line {
750            let sl = sl.max(1_i64);
751            if sl <= 1 {
752                return;
753            }
754            *fresh = true;
755            if !explicit_mode || mode.starts_with("lines") {
756                *mode = format!("lines:{sl}-999999");
757            }
758        }
759    }
760
761    #[test]
762    fn start_line_1_does_not_override_mode() {
763        let mut mode = "auto".to_string();
764        let mut fresh = false;
765        apply_start_line(&mut mode, &mut fresh, false, Some(1));
766        assert_eq!(mode, "auto", "start_line=1 should not change mode");
767        assert!(!fresh, "start_line=1 should not force fresh=true");
768    }
769
770    #[test]
771    fn start_line_gt1_overrides_implicit_mode() {
772        let mut mode = "auto".to_string();
773        let mut fresh = false;
774        apply_start_line(&mut mode, &mut fresh, false, Some(50));
775        assert_eq!(mode, "lines:50-999999");
776        assert!(fresh);
777    }
778
779    #[test]
780    fn start_line_gt1_does_not_override_explicit_map() {
781        // GitHub #259: mode=map + start_line=50 → mode stays map
782        let mut mode = "map".to_string();
783        let mut fresh = false;
784        apply_start_line(&mut mode, &mut fresh, true, Some(50));
785        assert_eq!(
786            mode, "map",
787            "explicit mode=map must not be clobbered by start_line"
788        );
789        assert!(fresh, "start_line>1 should still force fresh");
790    }
791
792    #[test]
793    fn start_line_gt1_does_not_override_explicit_signatures() {
794        let mut mode = "signatures".to_string();
795        let mut fresh = false;
796        apply_start_line(&mut mode, &mut fresh, true, Some(100));
797        assert_eq!(mode, "signatures");
798        assert!(fresh);
799    }
800
801    #[test]
802    fn start_line_gt1_honors_explicit_lines_mode() {
803        let mut mode = "lines:1-50".to_string();
804        let mut fresh = false;
805        apply_start_line(&mut mode, &mut fresh, true, Some(30));
806        assert_eq!(
807            mode, "lines:30-999999",
808            "explicit lines mode should accept start_line override"
809        );
810        assert!(fresh);
811    }
812
813    #[test]
814    fn start_line_none_does_nothing() {
815        let mut mode = "map".to_string();
816        let mut fresh = false;
817        apply_start_line(&mut mode, &mut fresh, true, None);
818        assert_eq!(mode, "map");
819        assert!(!fresh);
820    }
821
822    #[test]
823    fn start_line_1_with_explicit_mode_preserves_it() {
824        // OpenCode sends start_line=1 + mode=map — both should be preserved
825        let mut mode = "map".to_string();
826        let mut fresh = false;
827        apply_start_line(&mut mode, &mut fresh, true, Some(1));
828        assert_eq!(mode, "map");
829        assert!(!fresh);
830    }
831
832    // -- Regression: GitHub Issue #262 --
833    // auto_degrade_read_mode must produce a warning when mode is downgraded.
834
835    use crate::core::degradation_policy::DegradationVerdictV1;
836
837    #[test]
838    fn verdict_ok_does_not_degrade() {
839        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Ok);
840        assert_eq!(mode, "full");
841        assert!(!degraded);
842    }
843
844    #[test]
845    fn verdict_warn_degrades_full_to_map() {
846        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
847        assert_eq!(mode, "map");
848        assert!(degraded, "full→map must be flagged as degraded");
849    }
850
851    #[test]
852    fn verdict_warn_keeps_map() {
853        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Warn);
854        assert_eq!(mode, "map");
855        assert!(!degraded, "map is not degraded under Warn");
856    }
857
858    #[test]
859    fn verdict_warn_keeps_signatures() {
860        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Warn);
861        assert_eq!(mode, "signatures");
862        assert!(!degraded);
863    }
864
865    #[test]
866    fn verdict_throttle_degrades_full_to_signatures() {
867        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Throttle);
868        assert_eq!(mode, "signatures");
869        assert!(degraded);
870    }
871
872    #[test]
873    fn verdict_throttle_degrades_map_to_signatures() {
874        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Throttle);
875        assert_eq!(mode, "signatures");
876        assert!(degraded);
877    }
878
879    #[test]
880    fn verdict_throttle_keeps_lines() {
881        let (mode, degraded) = super::apply_verdict("lines:1-50", DegradationVerdictV1::Throttle);
882        assert_eq!(mode, "lines:1-50");
883        assert!(!degraded, "lines mode bypasses degradation");
884    }
885
886    #[test]
887    fn verdict_block_degrades_full_to_signatures() {
888        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Block);
889        assert_eq!(mode, "signatures");
890        assert!(degraded);
891    }
892
893    #[test]
894    fn verdict_block_does_not_degrade_signatures() {
895        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Block);
896        assert_eq!(mode, "signatures");
897        assert!(!degraded, "already at signatures — no degradation needed");
898    }
899
900    #[test]
901    fn degrade_warning_message_contains_mode_info() {
902        let (new_mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
903        assert!(degraded);
904        let warning = format!(
905            "⚠ Context pressure: mode=full was downgraded to mode={new_mode} (verdict: {:?}).",
906            DegradationVerdictV1::Warn
907        );
908        assert!(warning.contains("mode=full"));
909        assert!(warning.contains("mode=map"));
910        assert!(warning.contains("Warn"));
911    }
912
913    // --- auto_degrade_read_mode: no_degrade integration ---
914    // With default config (no LCTX_NO_DEGRADE), the profile's degradation.enforce
915    // is also off by default, so auto_degrade_read_mode returns mode unchanged.
916
917    #[test]
918    fn auto_degrade_preserves_full_when_default_config() {
919        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
920            return;
921        }
922        let (mode, warning) = super::auto_degrade_read_mode("full");
923        assert_eq!(mode, "full");
924        assert!(warning.is_none());
925    }
926
927    #[test]
928    fn auto_degrade_preserves_map_when_default_config() {
929        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
930            return;
931        }
932        let (mode, warning) = super::auto_degrade_read_mode("map");
933        assert_eq!(mode, "map");
934        assert!(warning.is_none());
935    }
936
937    #[test]
938    fn auto_degrade_preserves_signatures_when_default_config() {
939        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
940            return;
941        }
942        let (mode, warning) = super::auto_degrade_read_mode("signatures");
943        assert_eq!(mode, "signatures");
944        assert!(warning.is_none());
945    }
946
947    #[test]
948    fn auto_degrade_preserves_diff_always() {
949        let (mode, warning) = super::auto_degrade_read_mode("diff");
950        assert_eq!(mode, "diff");
951        assert!(warning.is_none());
952    }
953
954    #[test]
955    fn auto_degrade_preserves_lines_mode_always() {
956        let (mode, warning) = super::auto_degrade_read_mode("lines:10-50");
957        assert_eq!(mode, "lines:10-50");
958        assert!(warning.is_none());
959    }
960
961    #[test]
962    fn auto_degrade_preserves_aggressive_when_default_config() {
963        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
964            return;
965        }
966        let (mode, warning) = super::auto_degrade_read_mode("aggressive");
967        assert_eq!(mode, "aggressive");
968        assert!(warning.is_none());
969    }
970
971    #[test]
972    fn auto_degrade_preserves_entropy_when_default_config() {
973        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
974            return;
975        }
976        let (mode, warning) = super::auto_degrade_read_mode("entropy");
977        assert_eq!(mode, "entropy");
978        assert!(warning.is_none());
979    }
980
981    #[test]
982    fn auto_degrade_preserves_auto_when_default_config() {
983        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
984            return;
985        }
986        let (mode, warning) = super::auto_degrade_read_mode("auto");
987        assert_eq!(mode, "auto");
988        assert!(warning.is_none());
989    }
990
991    // --- apply_verdict: exhaustive mode × verdict matrix ---
992
993    #[test]
994    fn verdict_warn_does_not_degrade_diff() {
995        let (mode, degraded) = super::apply_verdict("diff", DegradationVerdictV1::Warn);
996        assert_eq!(mode, "diff");
997        assert!(!degraded);
998    }
999
1000    #[test]
1001    fn verdict_throttle_does_not_degrade_signatures() {
1002        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Throttle);
1003        assert_eq!(mode, "signatures");
1004        assert!(!degraded);
1005    }
1006
1007    #[test]
1008    fn verdict_ok_preserves_map() {
1009        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Ok);
1010        assert_eq!(mode, "map");
1011        assert!(!degraded);
1012    }
1013
1014    #[test]
1015    fn verdict_ok_preserves_signatures() {
1016        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Ok);
1017        assert_eq!(mode, "signatures");
1018        assert!(!degraded);
1019    }
1020
1021    #[test]
1022    fn verdict_ok_preserves_lines() {
1023        let (mode, degraded) = super::apply_verdict("lines:1-100", DegradationVerdictV1::Ok);
1024        assert_eq!(mode, "lines:1-100");
1025        assert!(!degraded);
1026    }
1027
1028    #[test]
1029    fn verdict_block_degrades_map_to_signatures() {
1030        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Block);
1031        assert_eq!(mode, "signatures");
1032        assert!(degraded);
1033    }
1034}