Skip to main content

lean_ctx/tools/registered/
ctx_read.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::{Arc, Mutex};
4
5use rmcp::model::Tool;
6use rmcp::ErrorData;
7use serde_json::{json, Map, Value};
8
9use crate::server::tool_trait::{
10    get_bool, get_int, get_str, require_resolved_path, McpTool, ToolContext, ToolOutput,
11};
12use crate::tool_defs::tool_def;
13
14/// Per-file lock that serializes concurrent reads of the same path.
15///
16/// When multiple subagents read sequentially through a shared set of files,
17/// they tend to hit the same path at the same time. Without per-file locking
18/// they all contend on the global cache write lock while doing redundant I/O.
19/// This lock ensures only one thread reads a given file from disk; the others
20/// wait cheaply on the per-file mutex, then hit the warm cache.
21fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
22    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
23        std::sync::OnceLock::new();
24    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
25    let mut map = map.lock().unwrap_or_else(|poisoned| {
26        tracing::warn!("per_file_lock map poisoned; recovering");
27        poisoned.into_inner()
28    });
29
30    const MAX_ENTRIES: usize = 500;
31    if map.len() > MAX_ENTRIES {
32        map.retain(|_, v| Arc::strong_count(v) > 1);
33    }
34
35    map.entry(path.to_string())
36        .or_insert_with(|| Arc::new(Mutex::new(())))
37        .clone()
38}
39
40pub struct CtxReadTool;
41
42impl McpTool for CtxReadTool {
43    fn name(&self) -> &'static str {
44        "ctx_read"
45    }
46
47    fn tool_def(&self) -> Tool {
48        tool_def(
49            "ctx_read",
50            "Read file (cached, compressed). Cached re-reads can be ~13 tok when unchanged. Auto-selects optimal mode. \
51Modes: full|map|signatures|diff|aggressive|entropy|task|reference|lines:N-M. fresh=true forces a disk re-read.",
52            json!({
53                "type": "object",
54                "properties": {
55                    "path": { "type": "string", "description": "Absolute file path to read" },
56                    "mode": {
57                        "type": "string",
58                        "description": "Compression mode (default: full). Use 'map' for context-only files. For line ranges: 'lines:N-M' (e.g. 'lines:400-500')."
59                    },
60                    "start_line": {
61                        "type": "integer",
62                        "description": "Start reading from this line (only used when no explicit mode is set, or with mode=lines). Does NOT override explicit modes like map/signatures."
63                    },
64                    "fresh": {
65                        "type": "boolean",
66                        "description": "Bypass cache and force a full re-read. Use when running as a subagent that may not have the parent's context."
67                    }
68                },
69                "required": ["path"]
70            }),
71        )
72    }
73
74    fn handle(
75        &self,
76        args: &Map<String, Value>,
77        ctx: &ToolContext,
78    ) -> Result<ToolOutput, ErrorData> {
79        let path = require_resolved_path(ctx, args, "path")?;
80
81        self.handle_inner(args, ctx, &path)
82    }
83}
84
85impl CtxReadTool {
86    #[allow(clippy::unused_self)]
87    fn handle_inner(
88        &self,
89        args: &Map<String, Value>,
90        ctx: &ToolContext,
91        path: &str,
92    ) -> Result<ToolOutput, ErrorData> {
93        let session_lock = ctx
94            .session
95            .as_ref()
96            .ok_or_else(|| ErrorData::internal_error("session not available", None))?;
97        let cache_lock = ctx
98            .cache
99            .as_ref()
100            .ok_or_else(|| ErrorData::internal_error("cache not available", None))?;
101
102        let current_task = {
103            let mut attempt = 0u32;
104            loop {
105                if let Ok(session) = tokio::task::block_in_place(|| {
106                    let rt = tokio::runtime::Handle::current();
107                    rt.block_on(tokio::time::timeout(
108                        std::time::Duration::from_secs(5),
109                        session_lock.read(),
110                    ))
111                }) {
112                    break session.task.as_ref().map(|t| t.description.clone());
113                }
114                attempt += 1;
115                if attempt >= 3 {
116                    tracing::warn!(
117                        "session read-lock timeout after {attempt} attempts in ctx_read for {path}"
118                    );
119                    return Err(ErrorData::internal_error(
120                        "session lock timeout — another tool may be holding it. Retry in a moment.",
121                        None,
122                    ));
123                }
124                tracing::debug!(
125                    "session read-lock attempt {attempt}/3 timed out for {path}, retrying"
126                );
127                std::thread::sleep(std::time::Duration::from_millis(100 * u64::from(attempt)));
128            }
129        };
130        let task_ref = current_task.as_deref();
131
132        let profile = crate::core::profiles::active_profile();
133        let explicit_mode_arg = get_str(args, "mode");
134        let explicit_mode = explicit_mode_arg.is_some();
135        let mut mode = if let Some(m) = explicit_mode_arg {
136            m
137        } else if profile.read.default_mode_effective() == "auto" {
138            if let Ok(cache) = cache_lock.try_read() {
139                crate::tools::ctx_smart_read::select_mode_with_task(&cache, path, task_ref)
140            } else {
141                tracing::debug!(
142                    "cache lock contested during auto-mode selection for {path}; \
143                     falling back to full"
144                );
145                "full".to_string()
146            }
147        } else {
148            profile.read.default_mode_effective().to_string()
149        };
150        let mut fresh = get_bool(args, "fresh").unwrap_or(false);
151        let cache_policy = crate::server::compaction_sync::effective_cache_policy();
152        if cache_policy == "off" {
153            fresh = true;
154        }
155        let start_line = get_int(args, "start_line");
156        if let Some(sl) = start_line {
157            let sl = sl.max(1_i64);
158            if sl > 1 {
159                fresh = true;
160                // Only override mode when no explicit mode was requested,
161                // or when the explicit mode is already a lines range.
162                // If the caller explicitly set mode=map/signatures/etc.,
163                // start_line must not clobber it (GitHub #259).
164                if !explicit_mode || mode.starts_with("lines") {
165                    mode = format!("lines:{sl}-999999");
166                }
167            }
168        }
169
170        let pressure_action = ctx.pressure_snapshot.as_ref().map(|p| &p.recommendation);
171        let resolved_agent_id = ctx.agent_id.as_ref().and_then(|a| match a.try_read() {
172            Ok(guard) => guard.clone(),
173            Err(_) => None,
174        });
175        let gate_result = crate::server::context_gate::pre_dispatch_read_for_agent(
176            path,
177            &mode,
178            task_ref,
179            Some(&ctx.project_root),
180            pressure_action,
181            resolved_agent_id.as_deref(),
182        );
183        if gate_result.budget_blocked {
184            let msg = gate_result
185                .budget_warning
186                .unwrap_or_else(|| "Agent token budget exceeded".to_string());
187            return Err(ErrorData::invalid_params(msg, None));
188        }
189        let budget_warning = gate_result.budget_warning.clone();
190        if let Some(overridden) = gate_result.overridden_mode {
191            mode = overridden;
192        }
193
194        let (mode, degrade_warning) = if crate::tools::ctx_read::is_instruction_file(path) {
195            ("full".to_string(), None)
196        } else {
197            auto_degrade_read_mode(&mode)
198        };
199
200        if mode.starts_with("lines:") {
201            fresh = true;
202        }
203
204        if crate::core::binary_detect::is_binary_file(path) {
205            let msg = crate::core::binary_detect::binary_file_message(path);
206            return Err(ErrorData::invalid_params(msg, None));
207        }
208        {
209            let cap = crate::core::limits::max_read_bytes() as u64;
210            if let Ok(meta) = std::fs::metadata(path) {
211                if meta.len() > cap {
212                    let msg = format!(
213                        "File too large ({} bytes, limit {} bytes via LCTX_MAX_READ_BYTES). \
214                         Use mode=\"lines:1-100\" for partial reads or increase the limit.",
215                        meta.len(),
216                        cap
217                    );
218                    return Err(ErrorData::invalid_params(msg, None));
219                }
220            }
221        }
222
223        // Compaction-aware: if host compacted since last check, reset delivery flags
224        // so post-compaction reads deliver full content instead of stubs.
225        if !fresh {
226            if let Ok(data_dir) = crate::core::data_dir::lean_ctx_data_dir() {
227                if let Ok(mut cache) = cache_lock.try_write() {
228                    crate::server::compaction_sync::sync_if_compacted(&mut cache, &data_dir);
229                }
230            }
231        }
232
233        // Fast path: if both per-file lock and cache write-lock are immediately
234        // available, execute inline without spawning a thread. This avoids thread +
235        // channel overhead for the ~90% of calls that are cache hits.
236        let read_timeout = std::time::Duration::from_secs(30);
237        let cancelled = Arc::new(AtomicBool::new(false));
238        let (output, resolved_mode, original, is_cache_hit, file_ref, cache_stats) = {
239            let crp_mode = ctx.crp_mode;
240            let task_ref = current_task.as_deref();
241
242            let fast_result = 'fast: {
243                let file_lock = per_file_lock(path);
244                let Some(_file_guard) = file_lock.try_lock().ok() else {
245                    break 'fast None;
246                };
247                let Some(mut cache) = cache_lock.try_write().ok() else {
248                    break 'fast None;
249                };
250                let read_output = if fresh {
251                    crate::tools::ctx_read::handle_fresh_with_task_resolved(
252                        &mut cache, path, &mode, crp_mode, task_ref,
253                    )
254                } else {
255                    crate::tools::ctx_read::handle_with_task_resolved(
256                        &mut cache, path, &mode, crp_mode, task_ref,
257                    )
258                };
259                let content = read_output.content;
260                let rmode = read_output.resolved_mode;
261                let orig = cache.get(path).map_or(0, |e| e.original_tokens);
262                let hit = content.contains(" cached ")
263                    || content.contains("[unchanged")
264                    || content.contains("[delta:");
265                let fref = cache.file_ref_map().get(path).cloned();
266                let stats = cache.get_stats();
267                let stats_snapshot = (stats.total_reads, stats.cache_hits);
268                Some((content, rmode, orig, hit, fref, stats_snapshot))
269            };
270
271            if let Some(result) = fast_result {
272                result
273            } else {
274                // Slow path: spawn thread with bounded timeout for contended locks.
275                let cache_lock = cache_lock.clone();
276                let mode = mode.clone();
277                let task_owned = current_task.clone();
278                let path_owned = path.to_string();
279                let cancel_flag = cancelled.clone();
280                let (tx, rx) = std::sync::mpsc::sync_channel(1);
281                std::thread::spawn(move || {
282                    let file_lock = per_file_lock(&path_owned);
283
284                    // Bounded per-file lock: if a zombie thread still holds it, don't
285                    // wait forever. 25s keeps us inside the 30s recv_timeout.
286                    let _file_guard = {
287                        let deadline =
288                            std::time::Instant::now() + std::time::Duration::from_secs(25);
289                        loop {
290                            if cancel_flag.load(Ordering::Relaxed) {
291                                return;
292                            }
293                            if let Ok(guard) = file_lock.try_lock() {
294                                break guard;
295                            }
296                            if std::time::Instant::now() >= deadline {
297                                tracing::error!(
298                                    "ctx_read: per-file lock timeout after 25s for {path_owned}"
299                                );
300                                let _ = tx.send((
301                                    format!("per-file lock contention for {path_owned} — retry in a moment"),
302                                    "error".to_string(), 0, false, None, (0, 0),
303                                ));
304                                return;
305                            }
306                            std::thread::sleep(std::time::Duration::from_millis(50));
307                        }
308                    };
309
310                    if cancel_flag.load(Ordering::Relaxed) {
311                        return;
312                    }
313
314                    // Bounded cache write-lock: avoids indefinite block when a zombie
315                    // thread from a previous timed-out call still holds the lock.
316                    let mut cache = {
317                        let deadline =
318                            std::time::Instant::now() + std::time::Duration::from_secs(25);
319                        loop {
320                            if cancel_flag.load(Ordering::Relaxed) {
321                                return;
322                            }
323                            if let Ok(guard) = cache_lock.try_write() {
324                                break guard;
325                            }
326                            if std::time::Instant::now() >= deadline {
327                                tracing::error!(
328                                    "ctx_read: cache write-lock timeout after 25s for {path_owned}"
329                                );
330                                let _ = tx.send((
331                                    format!("cache lock contention for {path_owned} — retry in a moment"),
332                                    "error".to_string(), 0, false, None, (0, 0),
333                                ));
334                                return;
335                            }
336                            std::thread::sleep(std::time::Duration::from_millis(50));
337                        }
338                    };
339
340                    let task_ref = task_owned.as_deref();
341                    let read_output = if fresh {
342                        crate::tools::ctx_read::handle_fresh_with_task_resolved(
343                            &mut cache,
344                            &path_owned,
345                            &mode,
346                            crp_mode,
347                            task_ref,
348                        )
349                    } else {
350                        crate::tools::ctx_read::handle_with_task_resolved(
351                            &mut cache,
352                            &path_owned,
353                            &mode,
354                            crp_mode,
355                            task_ref,
356                        )
357                    };
358                    let content = read_output.content;
359                    let rmode = read_output.resolved_mode;
360                    let orig = cache.get(&path_owned).map_or(0, |e| e.original_tokens);
361                    let hit = content.contains(" cached ");
362                    let fref = cache.file_ref_map().get(path_owned.as_str()).cloned();
363                    let stats = cache.get_stats();
364                    let stats_snapshot = (stats.total_reads, stats.cache_hits);
365                    let _ = tx.send((content, rmode, orig, hit, fref, stats_snapshot));
366                });
367                if let Ok(result) = rx.recv_timeout(read_timeout) {
368                    result
369                } else {
370                    cancelled.store(true, Ordering::Relaxed);
371                    tracing::error!("ctx_read timed out after {read_timeout:?} for {path}");
372                    let msg = format!(
373                        "ERROR: ctx_read timed out after {}s reading {path}. \
374                     The file may be very large or a blocking I/O issue occurred. \
375                     Try mode=\"lines:1-100\" for a partial read.",
376                        read_timeout.as_secs()
377                    );
378                    return Err(ErrorData::internal_error(msg, None));
379                }
380            } // end else (slow path)
381        };
382
383        // Convert error results to proper MCP ErrorData instead of success body
384        if resolved_mode == "error" {
385            return Err(ErrorData::invalid_params(output, None));
386        }
387
388        let output_tokens = crate::core::tokens::count_tokens(&output);
389        let saved = original.saturating_sub(output_tokens);
390
391        // Session updates (bounded lock — 10s timeout, read already succeeded)
392        let mut ensured_root: Option<String> = None;
393        let project_root_snapshot;
394        {
395            let session_guard = tokio::task::block_in_place(|| {
396                let rt = tokio::runtime::Handle::current();
397                rt.block_on(tokio::time::timeout(
398                    std::time::Duration::from_secs(10),
399                    session_lock.write(),
400                ))
401            });
402            if let Ok(mut session) = session_guard {
403                session.touch_file(path, file_ref.as_deref(), &resolved_mode, original);
404                if is_cache_hit {
405                    session.record_cache_hit();
406                }
407                if session.active_structured_intent.is_none() && session.files_touched.len() >= 2 {
408                    let touched: Vec<String> = session
409                        .files_touched
410                        .iter()
411                        .map(|f| f.path.clone())
412                        .collect();
413                    let inferred =
414                        crate::core::intent_engine::StructuredIntent::from_file_patterns(&touched);
415                    if inferred.confidence >= 0.4 {
416                        session.active_structured_intent = Some(inferred);
417                    }
418                }
419                let root_missing = session
420                    .project_root
421                    .as_deref()
422                    .is_none_or(|r| r.trim().is_empty());
423                if root_missing {
424                    if let Some(root) = crate::core::protocol::detect_project_root(path) {
425                        session.project_root = Some(root.clone());
426                        ensured_root = Some(root);
427                    }
428                }
429                project_root_snapshot = session
430                    .project_root
431                    .clone()
432                    .unwrap_or_else(|| ".".to_string());
433            } else {
434                tracing::warn!(
435                    "session write-lock timeout (5s) in ctx_read post-update for {path}"
436                );
437                project_root_snapshot = ctx.project_root.clone();
438            }
439        }
440
441        if let Some(root) = ensured_root.as_deref() {
442            crate::core::index_orchestrator::ensure_all_background(root);
443        }
444
445        crate::core::heatmap::record_file_access(path, original, saved);
446
447        // Mode predictor + feedback — no locks needed, uses snapshots from above
448        {
449            let sig = crate::core::mode_predictor::FileSignature::from_path(path, original);
450            let density = if output_tokens > 0 {
451                original as f64 / output_tokens as f64
452            } else {
453                1.0
454            };
455            let outcome = crate::core::mode_predictor::ModeOutcome {
456                mode: resolved_mode.clone(),
457                tokens_in: original,
458                tokens_out: output_tokens,
459                density: density.min(1.0),
460            };
461            let mut predictor = crate::core::mode_predictor::ModePredictor::new();
462            predictor.set_project_root(&project_root_snapshot);
463            predictor.record(sig, outcome);
464            predictor.save();
465
466            let ext = std::path::Path::new(path)
467                .extension()
468                .and_then(|e| e.to_str())
469                .unwrap_or("")
470                .to_string();
471            let thresholds = crate::core::adaptive_thresholds::thresholds_for_path(path);
472            let feedback_outcome = crate::core::feedback::CompressionOutcome {
473                session_id: format!("{}", std::process::id()),
474                language: ext,
475                entropy_threshold: thresholds.bpe_entropy,
476                jaccard_threshold: thresholds.jaccard,
477                total_turns: cache_stats.0 as u32,
478                tokens_saved: saved as u64,
479                tokens_original: original as u64,
480                cache_hits: cache_stats.1 as u32,
481                total_reads: cache_stats.0 as u32,
482                task_completed: true,
483                timestamp: chrono::Local::now().to_rfc3339(),
484            };
485            let mut store = crate::core::feedback::FeedbackStore::load();
486            store.project_root = Some(project_root_snapshot.clone());
487            store.record_outcome(feedback_outcome);
488        }
489
490        if let Some(aid) = resolved_agent_id.as_deref() {
491            crate::core::agent_budget::record_consumption(aid, output_tokens);
492        }
493
494        // Cross-source hints: if a graph index exists and has cross-source edges
495        // pointing to this file, append compact hints so the agent knows about
496        // related issues/PRs/schemas without a separate tool call.
497        let hints_suffix = {
498            if let Some(index) = crate::core::graph_index::ProjectIndex::load(&ctx.project_root) {
499                let hints = crate::core::cross_source_hints::hints_for_file(path, &index.edges);
500                if hints.is_empty() {
501                    String::new()
502                } else {
503                    crate::core::cross_source_hints::format_hints(&hints)
504                }
505            } else {
506                String::new()
507            }
508        };
509
510        let mut warnings = Vec::new();
511        if let Some(ref w) = budget_warning {
512            warnings.push(w.as_str());
513        }
514        if let Some(ref w) = degrade_warning {
515            warnings.push(w.as_str());
516        }
517        let final_output = if !warnings.is_empty() {
518            format!("{output}{hints_suffix}\n\n{}", warnings.join("\n"))
519        } else if hints_suffix.is_empty() {
520            output
521        } else {
522            format!("{output}{hints_suffix}")
523        };
524
525        Ok(ToolOutput {
526            text: final_output,
527            original_tokens: original,
528            saved_tokens: saved,
529            mode: Some(resolved_mode),
530            path: Some(path.to_string()),
531            changed: false,
532        })
533    }
534}
535
536fn apply_verdict(
537    mode: &str,
538    verdict: crate::core::degradation_policy::DegradationVerdictV1,
539) -> (String, bool) {
540    use crate::core::degradation_policy::DegradationVerdictV1;
541    match verdict {
542        DegradationVerdictV1::Ok => (mode.to_string(), false),
543        DegradationVerdictV1::Warn => match mode {
544            "full" => ("map".to_string(), true),
545            other => (other.to_string(), false),
546        },
547        DegradationVerdictV1::Throttle => match mode {
548            "full" | "map" => ("signatures".to_string(), true),
549            other => (other.to_string(), false),
550        },
551        DegradationVerdictV1::Block => {
552            if mode == "signatures" {
553                ("signatures".to_string(), false)
554            } else {
555                ("signatures".to_string(), true)
556            }
557        }
558    }
559}
560
561fn auto_degrade_read_mode(mode: &str) -> (String, Option<String>) {
562    if crate::core::config::Config::load().no_degrade_effective() {
563        return (mode.to_string(), None);
564    }
565    let profile = crate::core::profiles::active_profile();
566    if !profile.degradation.enforce_effective() {
567        return (mode.to_string(), None);
568    }
569    let policy = crate::core::degradation_policy::evaluate_v1_for_tool("ctx_read", None);
570    let (new_mode, degraded) = apply_verdict(mode, policy.decision.verdict);
571    let warning = if degraded {
572        Some(format!(
573            "⚠ Context pressure: mode={mode} was downgraded to mode={new_mode} \
574             (verdict: {:?}). Use start_line=1 to bypass, or run ctx_compress to free budget.",
575            policy.decision.verdict
576        ))
577    } else {
578        None
579    };
580    (new_mode, warning)
581}
582
583#[cfg(test)]
584mod tests {
585    use super::*;
586    use std::sync::atomic::{AtomicUsize, Ordering};
587
588    #[test]
589    fn per_file_lock_same_path_returns_same_mutex() {
590        let lock_a1 = per_file_lock("/tmp/test_same_path.txt");
591        let lock_a2 = per_file_lock("/tmp/test_same_path.txt");
592        assert!(Arc::ptr_eq(&lock_a1, &lock_a2));
593    }
594
595    #[test]
596    fn per_file_lock_different_paths_return_different_mutexes() {
597        let lock_a = per_file_lock("/tmp/test_path_a.txt");
598        let lock_b = per_file_lock("/tmp/test_path_b.txt");
599        assert!(!Arc::ptr_eq(&lock_a, &lock_b));
600    }
601
602    #[test]
603    fn per_file_lock_serializes_concurrent_access() {
604        let counter = Arc::new(AtomicUsize::new(0));
605        let max_concurrent = Arc::new(AtomicUsize::new(0));
606        let path = "/tmp/test_concurrent_serialization.txt";
607        let mut handles = Vec::new();
608
609        for _ in 0..5 {
610            let counter = counter.clone();
611            let max_concurrent = max_concurrent.clone();
612            let path = path.to_string();
613            handles.push(std::thread::spawn(move || {
614                let lock = per_file_lock(&path);
615                let _guard = lock.lock().unwrap();
616                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
617                max_concurrent.fetch_max(active, Ordering::SeqCst);
618                std::thread::sleep(std::time::Duration::from_millis(10));
619                counter.fetch_sub(1, Ordering::SeqCst);
620            }));
621        }
622
623        for h in handles {
624            h.join().unwrap();
625        }
626
627        assert_eq!(max_concurrent.load(Ordering::SeqCst), 1);
628    }
629
630    #[test]
631    fn per_file_lock_allows_parallel_different_paths() {
632        let counter = Arc::new(AtomicUsize::new(0));
633        let max_concurrent = Arc::new(AtomicUsize::new(0));
634        let mut handles = Vec::new();
635
636        for i in 0..4 {
637            let counter = counter.clone();
638            let max_concurrent = max_concurrent.clone();
639            let path = format!("/tmp/test_parallel_{i}.txt");
640            handles.push(std::thread::spawn(move || {
641                let lock = per_file_lock(&path);
642                let _guard = lock.lock().unwrap();
643                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
644                max_concurrent.fetch_max(active, Ordering::SeqCst);
645                std::thread::sleep(std::time::Duration::from_millis(50));
646                counter.fetch_sub(1, Ordering::SeqCst);
647            }));
648        }
649
650        for h in handles {
651            h.join().unwrap();
652        }
653
654        assert!(max_concurrent.load(Ordering::SeqCst) > 1);
655    }
656
657    /// Regression test for Issue #229: a zombie thread holding the cache write-lock
658    /// must not block subsequent reads indefinitely. The try_write() loop inside
659    /// the spawned thread should respect its 25s deadline and the cancellation flag.
660    #[test]
661    fn zombie_thread_does_not_block_subsequent_cache_access() {
662        let cache: Arc<tokio::sync::RwLock<u32>> = Arc::new(tokio::sync::RwLock::new(0));
663
664        // Simulate a zombie: hold the write-lock on a background thread for 2s.
665        let zombie_lock = cache.clone();
666        let _zombie = std::thread::spawn(move || {
667            let _guard = zombie_lock.blocking_write();
668            std::thread::sleep(std::time::Duration::from_secs(2));
669        });
670        std::thread::sleep(std::time::Duration::from_millis(50));
671
672        // A try_read() must fail immediately (zombie holds write-lock).
673        assert!(cache.try_read().is_err());
674
675        // A try_write() loop with cancellation must exit promptly.
676        let cancel = Arc::new(AtomicBool::new(false));
677        let cancel2 = cancel.clone();
678        let lock2 = cache.clone();
679        let waiter = std::thread::spawn(move || {
680            let start = std::time::Instant::now();
681            loop {
682                if cancel2.load(Ordering::Relaxed) {
683                    return (false, start.elapsed());
684                }
685                if let Ok(_guard) = lock2.try_write() {
686                    return (true, start.elapsed());
687                }
688                std::thread::sleep(std::time::Duration::from_millis(50));
689            }
690        });
691
692        // Set cancellation after 200ms — the loop should exit quickly.
693        std::thread::sleep(std::time::Duration::from_millis(200));
694        cancel.store(true, Ordering::Relaxed);
695
696        let (acquired, elapsed) = waiter.join().unwrap();
697        assert!(
698            !acquired,
699            "should not have acquired lock while zombie holds it"
700        );
701        assert!(
702            elapsed < std::time::Duration::from_secs(1),
703            "cancellation should have stopped the loop promptly"
704        );
705    }
706
707    // -- Regression: GitHub Issue #253 + #259 --
708    // Helper that mirrors the runtime start_line logic.
709    fn apply_start_line(
710        mode: &mut String,
711        fresh: &mut bool,
712        explicit_mode: bool,
713        start_line: Option<i64>,
714    ) {
715        if let Some(sl) = start_line {
716            let sl = sl.max(1_i64);
717            if sl <= 1 {
718                return;
719            }
720            *fresh = true;
721            if !explicit_mode || mode.starts_with("lines") {
722                *mode = format!("lines:{sl}-999999");
723            }
724        }
725    }
726
727    #[test]
728    fn start_line_1_does_not_override_mode() {
729        let mut mode = "auto".to_string();
730        let mut fresh = false;
731        apply_start_line(&mut mode, &mut fresh, false, Some(1));
732        assert_eq!(mode, "auto", "start_line=1 should not change mode");
733        assert!(!fresh, "start_line=1 should not force fresh=true");
734    }
735
736    #[test]
737    fn start_line_gt1_overrides_implicit_mode() {
738        let mut mode = "auto".to_string();
739        let mut fresh = false;
740        apply_start_line(&mut mode, &mut fresh, false, Some(50));
741        assert_eq!(mode, "lines:50-999999");
742        assert!(fresh);
743    }
744
745    #[test]
746    fn start_line_gt1_does_not_override_explicit_map() {
747        // GitHub #259: mode=map + start_line=50 → mode stays map
748        let mut mode = "map".to_string();
749        let mut fresh = false;
750        apply_start_line(&mut mode, &mut fresh, true, Some(50));
751        assert_eq!(
752            mode, "map",
753            "explicit mode=map must not be clobbered by start_line"
754        );
755        assert!(fresh, "start_line>1 should still force fresh");
756    }
757
758    #[test]
759    fn start_line_gt1_does_not_override_explicit_signatures() {
760        let mut mode = "signatures".to_string();
761        let mut fresh = false;
762        apply_start_line(&mut mode, &mut fresh, true, Some(100));
763        assert_eq!(mode, "signatures");
764        assert!(fresh);
765    }
766
767    #[test]
768    fn start_line_gt1_honors_explicit_lines_mode() {
769        let mut mode = "lines:1-50".to_string();
770        let mut fresh = false;
771        apply_start_line(&mut mode, &mut fresh, true, Some(30));
772        assert_eq!(
773            mode, "lines:30-999999",
774            "explicit lines mode should accept start_line override"
775        );
776        assert!(fresh);
777    }
778
779    #[test]
780    fn start_line_none_does_nothing() {
781        let mut mode = "map".to_string();
782        let mut fresh = false;
783        apply_start_line(&mut mode, &mut fresh, true, None);
784        assert_eq!(mode, "map");
785        assert!(!fresh);
786    }
787
788    #[test]
789    fn start_line_1_with_explicit_mode_preserves_it() {
790        // OpenCode sends start_line=1 + mode=map — both should be preserved
791        let mut mode = "map".to_string();
792        let mut fresh = false;
793        apply_start_line(&mut mode, &mut fresh, true, Some(1));
794        assert_eq!(mode, "map");
795        assert!(!fresh);
796    }
797
798    // -- Regression: GitHub Issue #262 --
799    // auto_degrade_read_mode must produce a warning when mode is downgraded.
800
801    use crate::core::degradation_policy::DegradationVerdictV1;
802
803    #[test]
804    fn verdict_ok_does_not_degrade() {
805        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Ok);
806        assert_eq!(mode, "full");
807        assert!(!degraded);
808    }
809
810    #[test]
811    fn verdict_warn_degrades_full_to_map() {
812        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
813        assert_eq!(mode, "map");
814        assert!(degraded, "full→map must be flagged as degraded");
815    }
816
817    #[test]
818    fn verdict_warn_keeps_map() {
819        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Warn);
820        assert_eq!(mode, "map");
821        assert!(!degraded, "map is not degraded under Warn");
822    }
823
824    #[test]
825    fn verdict_warn_keeps_signatures() {
826        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Warn);
827        assert_eq!(mode, "signatures");
828        assert!(!degraded);
829    }
830
831    #[test]
832    fn verdict_throttle_degrades_full_to_signatures() {
833        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Throttle);
834        assert_eq!(mode, "signatures");
835        assert!(degraded);
836    }
837
838    #[test]
839    fn verdict_throttle_degrades_map_to_signatures() {
840        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Throttle);
841        assert_eq!(mode, "signatures");
842        assert!(degraded);
843    }
844
845    #[test]
846    fn verdict_throttle_keeps_lines() {
847        let (mode, degraded) = super::apply_verdict("lines:1-50", DegradationVerdictV1::Throttle);
848        assert_eq!(mode, "lines:1-50");
849        assert!(!degraded, "lines mode bypasses degradation");
850    }
851
852    #[test]
853    fn verdict_block_degrades_full_to_signatures() {
854        let (mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Block);
855        assert_eq!(mode, "signatures");
856        assert!(degraded);
857    }
858
859    #[test]
860    fn verdict_block_does_not_degrade_signatures() {
861        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Block);
862        assert_eq!(mode, "signatures");
863        assert!(!degraded, "already at signatures — no degradation needed");
864    }
865
866    #[test]
867    fn degrade_warning_message_contains_mode_info() {
868        let (new_mode, degraded) = super::apply_verdict("full", DegradationVerdictV1::Warn);
869        assert!(degraded);
870        let warning = format!(
871            "⚠ Context pressure: mode=full was downgraded to mode={new_mode} (verdict: {:?}).",
872            DegradationVerdictV1::Warn
873        );
874        assert!(warning.contains("mode=full"));
875        assert!(warning.contains("mode=map"));
876        assert!(warning.contains("Warn"));
877    }
878
879    // --- auto_degrade_read_mode: no_degrade integration ---
880    // With default config (no LCTX_NO_DEGRADE), the profile's degradation.enforce
881    // is also off by default, so auto_degrade_read_mode returns mode unchanged.
882
883    #[test]
884    fn auto_degrade_preserves_full_when_default_config() {
885        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
886            return;
887        }
888        let (mode, warning) = super::auto_degrade_read_mode("full");
889        assert_eq!(mode, "full");
890        assert!(warning.is_none());
891    }
892
893    #[test]
894    fn auto_degrade_preserves_map_when_default_config() {
895        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
896            return;
897        }
898        let (mode, warning) = super::auto_degrade_read_mode("map");
899        assert_eq!(mode, "map");
900        assert!(warning.is_none());
901    }
902
903    #[test]
904    fn auto_degrade_preserves_signatures_when_default_config() {
905        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
906            return;
907        }
908        let (mode, warning) = super::auto_degrade_read_mode("signatures");
909        assert_eq!(mode, "signatures");
910        assert!(warning.is_none());
911    }
912
913    #[test]
914    fn auto_degrade_preserves_diff_always() {
915        let (mode, warning) = super::auto_degrade_read_mode("diff");
916        assert_eq!(mode, "diff");
917        assert!(warning.is_none());
918    }
919
920    #[test]
921    fn auto_degrade_preserves_lines_mode_always() {
922        let (mode, warning) = super::auto_degrade_read_mode("lines:10-50");
923        assert_eq!(mode, "lines:10-50");
924        assert!(warning.is_none());
925    }
926
927    #[test]
928    fn auto_degrade_preserves_aggressive_when_default_config() {
929        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
930            return;
931        }
932        let (mode, warning) = super::auto_degrade_read_mode("aggressive");
933        assert_eq!(mode, "aggressive");
934        assert!(warning.is_none());
935    }
936
937    #[test]
938    fn auto_degrade_preserves_entropy_when_default_config() {
939        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
940            return;
941        }
942        let (mode, warning) = super::auto_degrade_read_mode("entropy");
943        assert_eq!(mode, "entropy");
944        assert!(warning.is_none());
945    }
946
947    #[test]
948    fn auto_degrade_preserves_auto_when_default_config() {
949        if std::env::var("LCTX_NO_DEGRADE").is_ok() {
950            return;
951        }
952        let (mode, warning) = super::auto_degrade_read_mode("auto");
953        assert_eq!(mode, "auto");
954        assert!(warning.is_none());
955    }
956
957    // --- apply_verdict: exhaustive mode × verdict matrix ---
958
959    #[test]
960    fn verdict_warn_does_not_degrade_diff() {
961        let (mode, degraded) = super::apply_verdict("diff", DegradationVerdictV1::Warn);
962        assert_eq!(mode, "diff");
963        assert!(!degraded);
964    }
965
966    #[test]
967    fn verdict_throttle_does_not_degrade_signatures() {
968        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Throttle);
969        assert_eq!(mode, "signatures");
970        assert!(!degraded);
971    }
972
973    #[test]
974    fn verdict_ok_preserves_map() {
975        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Ok);
976        assert_eq!(mode, "map");
977        assert!(!degraded);
978    }
979
980    #[test]
981    fn verdict_ok_preserves_signatures() {
982        let (mode, degraded) = super::apply_verdict("signatures", DegradationVerdictV1::Ok);
983        assert_eq!(mode, "signatures");
984        assert!(!degraded);
985    }
986
987    #[test]
988    fn verdict_ok_preserves_lines() {
989        let (mode, degraded) = super::apply_verdict("lines:1-100", DegradationVerdictV1::Ok);
990        assert_eq!(mode, "lines:1-100");
991        assert!(!degraded);
992    }
993
994    #[test]
995    fn verdict_block_degrades_map_to_signatures() {
996        let (mode, degraded) = super::apply_verdict("map", DegradationVerdictV1::Block);
997        assert_eq!(mode, "signatures");
998        assert!(degraded);
999    }
1000}