Skip to main content

lean_ctx/core/
io_health.rs

1use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
2use std::sync::OnceLock;
3use std::time::Duration;
4
5static FREEZE_COUNT: AtomicU32 = AtomicU32::new(0);
6static LAST_FREEZE_EPOCH_MS: AtomicU64 = AtomicU64::new(0);
7
8const FREEZE_WINDOW_MS: u64 = 60_000;
9const DEGRADED_THRESHOLD: u32 = 3;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum IoEnvironment {
13    Fast,
14    SlowFs,
15    Degraded,
16}
17
18pub fn environment() -> IoEnvironment {
19    if recent_freeze_count() >= DEGRADED_THRESHOLD {
20        return IoEnvironment::Degraded;
21    }
22    if is_slow_environment() {
23        return IoEnvironment::SlowFs;
24    }
25    IoEnvironment::Fast
26}
27
28pub fn record_freeze() {
29    FREEZE_COUNT.fetch_add(1, Ordering::Relaxed);
30    let now = epoch_ms();
31    LAST_FREEZE_EPOCH_MS.store(now, Ordering::Relaxed);
32    tracing::debug!(
33        "io_health: freeze recorded (total in window: {})",
34        recent_freeze_count()
35    );
36}
37
38pub fn recent_freeze_count() -> u32 {
39    let last = LAST_FREEZE_EPOCH_MS.load(Ordering::Relaxed);
40    if last == 0 {
41        return 0;
42    }
43    let now = epoch_ms();
44    if now.saturating_sub(last) > FREEZE_WINDOW_MS {
45        FREEZE_COUNT.store(0, Ordering::Relaxed);
46        return 0;
47    }
48    FREEZE_COUNT.load(Ordering::Relaxed)
49}
50
51/// Returns an adaptive timeout: longer in slow/degraded environments to avoid
52/// a death spiral where shorter timeouts cause more timeouts.
53pub fn adaptive_timeout(base: Duration) -> Duration {
54    match environment() {
55        IoEnvironment::Fast => base,
56        IoEnvironment::SlowFs => base.mul_f32(1.5),
57        IoEnvironment::Degraded => base.mul_f32(2.0),
58    }
59}
60
61pub fn is_wsl() -> bool {
62    #[cfg(target_os = "linux")]
63    {
64        static IS_WSL: OnceLock<bool> = OnceLock::new();
65        *IS_WSL.get_or_init(|| {
66            std::fs::read_to_string("/proc/version").is_ok_and(|v| {
67                let lower = v.to_lowercase();
68                lower.contains("microsoft") || lower.contains("wsl")
69            })
70        })
71    }
72    #[cfg(not(target_os = "linux"))]
73    {
74        false
75    }
76}
77
78/// Detects if a path is likely on a slow filesystem (DrvFS, NFS, FUSE, sshfs).
79pub fn is_slow_mount(path: &str) -> bool {
80    if is_wsl() && path.starts_with("/mnt/") {
81        return true;
82    }
83    #[cfg(target_os = "linux")]
84    {
85        static SLOW_PREFIXES: OnceLock<Vec<String>> = OnceLock::new();
86        let prefixes = SLOW_PREFIXES.get_or_init(detect_slow_mount_prefixes);
87        for prefix in prefixes {
88            if path.starts_with(prefix.as_str()) {
89                return true;
90            }
91        }
92    }
93    false
94}
95
96fn is_slow_environment() -> bool {
97    static SLOW_ENV: OnceLock<bool> = OnceLock::new();
98    *SLOW_ENV.get_or_init(|| {
99        if is_wsl() {
100            return true;
101        }
102        #[cfg(target_os = "linux")]
103        {
104            if has_nfs_or_fuse_mounts() {
105                return true;
106            }
107        }
108        false
109    })
110}
111
112#[cfg(target_os = "linux")]
113fn detect_slow_mount_prefixes() -> Vec<String> {
114    let mut prefixes = Vec::new();
115    let Ok(mounts) = std::fs::read_to_string("/proc/mounts") else {
116        return prefixes;
117    };
118    for line in mounts.lines() {
119        let parts: Vec<&str> = line.split_whitespace().collect();
120        if parts.len() < 3 {
121            continue;
122        }
123        let mount_point = parts[1];
124        let fs_type = parts[2];
125        if matches!(
126            fs_type,
127            "nfs" | "nfs4" | "cifs" | "smbfs" | "fuse" | "fuse.sshfs" | "9p" | "drvfs"
128        ) {
129            prefixes.push(mount_point.to_string());
130        }
131    }
132    prefixes
133}
134
135#[cfg(target_os = "linux")]
136fn has_nfs_or_fuse_mounts() -> bool {
137    !detect_slow_mount_prefixes().is_empty()
138}
139
140fn epoch_ms() -> u64 {
141    std::time::SystemTime::now()
142        .duration_since(std::time::UNIX_EPOCH)
143        .map_or(0, |d| d.as_millis() as u64)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn environment_returns_valid_state() {
152        let env = environment();
153        assert!(matches!(
154            env,
155            IoEnvironment::Fast | IoEnvironment::SlowFs | IoEnvironment::Degraded
156        ));
157    }
158
159    #[test]
160    fn record_freeze_increments_count() {
161        let before = recent_freeze_count();
162        record_freeze();
163        assert!(recent_freeze_count() > before);
164    }
165
166    #[test]
167    fn adaptive_timeout_increases_in_degraded() {
168        let base = Duration::from_secs(10);
169        for _ in 0..5 {
170            record_freeze();
171        }
172        let adapted = adaptive_timeout(base);
173        assert!(
174            adapted > base,
175            "degraded environment should get longer timeout, got {adapted:?} for base {base:?}"
176        );
177    }
178
179    #[test]
180    fn is_slow_mount_false_for_local_paths() {
181        assert!(!is_slow_mount("/home/user/project/src/main.rs"));
182        assert!(!is_slow_mount("/tmp/test.txt"));
183    }
184}