Skip to main content

mars_agents/models/probes/
pi_cache.rs

1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use serde::{Deserialize, Serialize};
6
7use super::pi::PiProbeResult;
8use super::probe_refresh::ProbeCacheBranch;
9use crate::error::MarsError;
10
11const SCHEMA_VERSION: u32 = 2;
12const DEFAULT_TTL_SECS: u64 = 60;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PiProbeCacheEntry {
16    pub schema_version: u32,
17    pub harness: String,
18    pub fetched_at: u64,
19    pub last_attempt_at: u64,
20    pub last_error: Option<String>,
21    pub result: Option<PiProbeResult>,
22}
23
24#[derive(Debug, Clone)]
25pub enum CachedPiProbeOutcome {
26    Hit(PiProbeResult),
27    Stale(PiProbeResult),
28    Miss(PiProbeResult),
29    Unavailable,
30}
31
32impl CachedPiProbeOutcome {
33    pub fn result(&self) -> Option<&PiProbeResult> {
34        match self {
35            Self::Hit(r) | Self::Stale(r) | Self::Miss(r) => Some(r),
36            Self::Unavailable => None,
37        }
38    }
39
40    pub fn cache_status(&self) -> &'static str {
41        match self {
42            Self::Hit(_) => "hit",
43            Self::Stale(_) => "stale",
44            Self::Miss(_) => "miss",
45            Self::Unavailable => "skipped",
46        }
47    }
48}
49
50fn should_probe_pi(installed: &HashSet<String>, is_offline: bool) -> bool {
51    !is_offline && installed.contains("pi")
52}
53
54fn cache_dir() -> Result<PathBuf, MarsError> {
55    let root = crate::platform::cache::global_cache_root()?;
56    Ok(root.join("availability"))
57}
58
59fn cache_path() -> Result<PathBuf, MarsError> {
60    Ok(cache_dir()?.join("pi.json"))
61}
62
63fn lock_path() -> Result<PathBuf, MarsError> {
64    Ok(cache_dir()?.join(".pi.lock"))
65}
66
67fn ttl_secs() -> u64 {
68    std::env::var("MARS_PROBE_CACHE_TTL_SECS")
69        .ok()
70        .and_then(|v| v.parse::<u64>().ok())
71        .unwrap_or(DEFAULT_TTL_SECS)
72}
73
74fn now_unix_secs() -> u64 {
75    std::time::SystemTime::now()
76        .duration_since(std::time::UNIX_EPOCH)
77        .unwrap_or_default()
78        .as_secs()
79}
80
81fn is_fresh(entry: &PiProbeCacheEntry) -> bool {
82    let ttl = ttl_secs();
83    let now = now_unix_secs();
84    if entry.fetched_at > now {
85        return false;
86    }
87    (now - entry.fetched_at) < ttl
88}
89
90fn read_cache_tolerant() -> Option<PiProbeCacheEntry> {
91    read_cache_tolerant_at(&cache_path().ok()?)
92}
93
94fn read_cache_tolerant_at(path: &Path) -> Option<PiProbeCacheEntry> {
95    let content = std::fs::read_to_string(path).ok()?;
96    let entry: PiProbeCacheEntry = serde_json::from_str(&content).ok()?;
97    if entry.schema_version != SCHEMA_VERSION {
98        return None;
99    }
100    if !entry.harness.eq_ignore_ascii_case("pi") {
101        return None;
102    }
103    Some(entry)
104}
105
106fn write_cache_at(path: &Path, entry: &PiProbeCacheEntry) -> Result<(), MarsError> {
107    let json = serde_json::to_string_pretty(entry)
108        .map_err(|e| MarsError::Internal(format!("pi probe cache serialize: {e}")))?;
109    crate::fs::atomic_write(path, json.as_bytes())
110}
111
112struct FileLock {
113    _file: std::fs::File,
114}
115
116fn try_lock() -> Option<FileLock> {
117    lock_at(&lock_path().ok()?, true)
118}
119
120fn blocking_lock() -> Option<FileLock> {
121    lock_at(&lock_path().ok()?, false)
122}
123
124fn lock_at(path: &Path, nonblocking: bool) -> Option<FileLock> {
125    if let Some(parent) = path.parent() {
126        std::fs::create_dir_all(parent).ok()?;
127    }
128    let file = std::fs::OpenOptions::new()
129        .create(true)
130        .write(true)
131        .truncate(false)
132        .open(path)
133        .ok()?;
134
135    #[cfg(unix)]
136    {
137        use std::os::unix::io::AsRawFd;
138        let flags = if nonblocking {
139            libc::LOCK_EX | libc::LOCK_NB
140        } else {
141            libc::LOCK_EX
142        };
143        let ret = unsafe { libc::flock(file.as_raw_fd(), flags) };
144        if ret != 0 {
145            return None;
146        }
147    }
148
149    #[cfg(windows)]
150    {
151        use std::os::windows::io::AsRawHandle;
152        use windows_sys::Win32::Foundation::HANDLE;
153        use windows_sys::Win32::Storage::FileSystem::{
154            LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
155        };
156        let handle = file.as_raw_handle() as HANDLE;
157        let mut overlapped = unsafe { std::mem::zeroed() };
158        let flags = if nonblocking {
159            LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY
160        } else {
161            LOCKFILE_EXCLUSIVE_LOCK
162        };
163        let ret = unsafe { LockFileEx(handle, flags, 0, 1, 0, &mut overlapped) };
164        if ret == 0 {
165            return None;
166        }
167    }
168
169    Some(FileLock { _file: file })
170}
171
172pub fn probe_cached(
173    installed: &HashSet<String>,
174    mars_offline: bool,
175    probe_refresh: super::ProbeRefreshMode,
176) -> CachedPiProbeOutcome {
177    if !should_probe_pi(installed, mars_offline) {
178        return CachedPiProbeOutcome::Unavailable;
179    }
180
181    probe_cached_impl(
182        mars_offline,
183        probe_refresh,
184        &cache_path().ok(),
185        super::pi::probe,
186        || spawn_detached_refresh().map_err(|_| ()),
187    )
188}
189
190fn probe_cached_impl<F, S>(
191    mars_offline: bool,
192    probe_refresh: super::ProbeRefreshMode,
193    path: &Option<PathBuf>,
194    probe: F,
195    spawn_refresh: S,
196) -> CachedPiProbeOutcome
197where
198    F: Fn() -> PiProbeResult,
199    S: Fn() -> Result<(), ()>,
200{
201    let cached = path.as_deref().and_then(read_cache_tolerant_at);
202    match super::probe_refresh::resolve_probe_cache_branch(
203        cached,
204        mars_offline,
205        probe_refresh,
206        |entry| {
207            entry
208                .result
209                .as_ref()
210                .filter(|result| is_usable_result(Some(*result)))
211        },
212        is_fresh,
213        || trigger_background_refresh_with(spawn_refresh),
214    ) {
215        ProbeCacheBranch::Hit(result) => CachedPiProbeOutcome::Hit(result),
216        ProbeCacheBranch::Stale(result) => CachedPiProbeOutcome::Stale(result),
217        ProbeCacheBranch::Unavailable => CachedPiProbeOutcome::Unavailable,
218        ProbeCacheBranch::SynchronousProbe => synchronous_probe_with(path, probe),
219    }
220}
221
222fn trigger_background_refresh_with<S>(spawn_refresh: S)
223where
224    S: Fn() -> Result<(), ()>,
225{
226    let Some(lock) = try_lock() else { return };
227    if let Some(entry) = read_cache_tolerant()
228        && is_fresh(&entry)
229        && is_usable_result(entry.result.as_ref())
230    {
231        drop(lock);
232        return;
233    }
234    let _ = spawn_refresh();
235    drop(lock);
236}
237
238fn synchronous_probe_with<F>(path: &Option<PathBuf>, probe: F) -> CachedPiProbeOutcome
239where
240    F: Fn() -> PiProbeResult,
241{
242    let lock = blocking_lock();
243
244    if lock.is_some()
245        && let Some(path) = path
246        && let Some(entry) = read_cache_tolerant_at(path)
247        && is_usable_result(entry.result.as_ref())
248    {
249        if is_fresh(&entry) {
250            return CachedPiProbeOutcome::Hit(entry.result.unwrap());
251        }
252
253        let probe_result = probe();
254        write_probe_attempt(path, probe_result.clone());
255        return CachedPiProbeOutcome::Miss(probe_result);
256    }
257
258    let probe_result = probe();
259    if let Some(path) = path {
260        write_probe_attempt(path, probe_result.clone());
261    }
262    drop(lock);
263
264    CachedPiProbeOutcome::Miss(probe_result)
265}
266
267fn write_probe_attempt(path: &Path, probe_result: PiProbeResult) {
268    let now = now_unix_secs();
269    let entry = PiProbeCacheEntry {
270        schema_version: SCHEMA_VERSION,
271        harness: "pi".to_string(),
272        fetched_at: now,
273        last_attempt_at: now,
274        last_error: probe_result.error.clone(),
275        result: Some(probe_result),
276    };
277
278    if let Err(e) = write_cache_at(path, &entry) {
279        eprintln!("debug: pi probe cache write failed: {e}");
280    }
281}
282
283fn spawn_detached_refresh() -> std::io::Result<()> {
284    let mars_bin = std::env::current_exe()?;
285    let mut cmd = std::process::Command::new(mars_bin);
286    cmd.args(["models", "__refresh-probe", "--target", "pi"]);
287    cmd.stdin(Stdio::null());
288    cmd.stdout(Stdio::null());
289    cmd.stderr(Stdio::null());
290
291    #[cfg(unix)]
292    {
293        use std::os::unix::process::CommandExt;
294        unsafe {
295            cmd.pre_exec(|| {
296                libc::setsid();
297                Ok(())
298            });
299        }
300    }
301
302    #[cfg(windows)]
303    {
304        use std::os::windows::process::CommandExt;
305        cmd.creation_flags(0x00000008);
306    }
307
308    cmd.spawn()?;
309    Ok(())
310}
311
312pub fn run_refresh_probe_command() -> Result<i32, MarsError> {
313    let Some(_lock) = blocking_lock() else {
314        return Ok(0);
315    };
316
317    if let Some(entry) = read_cache_tolerant()
318        && is_fresh(&entry)
319        && is_usable_result(entry.result.as_ref())
320    {
321        return Ok(0);
322    }
323
324    let probe_result = super::pi::probe();
325    if let Ok(path) = cache_path() {
326        write_probe_attempt(&path, probe_result);
327    }
328
329    Ok(0)
330}
331
332fn is_usable_result(result: Option<&PiProbeResult>) -> bool {
333    result.is_some_and(|probe| probe.error.is_none())
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339    use tempfile::TempDir;
340
341    fn compatible_result() -> PiProbeResult {
342        PiProbeResult {
343            binary_path: "/tmp/pi".to_string(),
344            version: Some("pi 0.4.2".to_string()),
345            compatible: true,
346            help_surface_tokens_present: vec!["--mode".to_string()],
347            help_surface_tokens_missing: Vec::new(),
348            model_slugs: HashSet::from(["openai/gpt-5.4".to_string()]),
349            error: None,
350        }
351    }
352
353    fn incompatible_result() -> PiProbeResult {
354        PiProbeResult {
355            compatible: false,
356            help_surface_tokens_missing: vec!["--mode".to_string()],
357            error: Some("missing help tokens".to_string()),
358            ..PiProbeResult::default()
359        }
360    }
361
362    fn entry(fetched_at: u64, result: Option<PiProbeResult>) -> PiProbeCacheEntry {
363        PiProbeCacheEntry {
364            schema_version: SCHEMA_VERSION,
365            harness: "pi".to_string(),
366            fetched_at,
367            last_attempt_at: fetched_at,
368            last_error: None,
369            result,
370        }
371    }
372
373    fn cache_file(temp: &TempDir) -> PathBuf {
374        temp.path().join("availability").join("pi.json")
375    }
376
377    fn write_entry(path: &Path, entry: &PiProbeCacheEntry) {
378        write_cache_at(path, entry).unwrap();
379    }
380
381    #[test]
382    fn legacy_v1_cache_without_model_slugs_is_reprobed() {
383        let temp = TempDir::new().unwrap();
384        let path = cache_file(&temp);
385        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
386        std::fs::write(
387            &path,
388            serde_json::json!({
389                "schema_version": 1,
390                "harness": "pi",
391                "fetched_at": now_unix_secs(),
392                "last_attempt_at": now_unix_secs(),
393                "last_error": null,
394                "result": {
395                    "binary_path": "/tmp/pi",
396                    "version": "pi 0.4.2",
397                    "compatible": true,
398                    "help_surface_tokens_present": ["--mode"],
399                    "help_surface_tokens_missing": [],
400                    "error": null
401                }
402            })
403            .to_string(),
404        )
405        .unwrap();
406
407        let outcome = probe_cached_impl(
408            false,
409            crate::models::probes::ProbeRefreshMode::Background,
410            &Some(path),
411            || PiProbeResult {
412                model_slugs: HashSet::from(["openai/gpt-5.5".to_string()]),
413                ..compatible_result()
414            },
415            || Ok(()),
416        );
417
418        let CachedPiProbeOutcome::Miss(result) = outcome else {
419            panic!("legacy cache entries without model slug capability must trigger a fresh probe");
420        };
421        assert!(result.model_slugs.contains("openai/gpt-5.5"));
422    }
423
424    #[test]
425    fn fresh_hit_returns_cached_result() {
426        let temp = TempDir::new().unwrap();
427        let path = cache_file(&temp);
428        write_entry(&path, &entry(now_unix_secs(), Some(compatible_result())));
429
430        let outcome = probe_cached_impl(
431            false,
432            crate::models::probes::ProbeRefreshMode::Background,
433            &Some(path),
434            incompatible_result,
435            || Ok(()),
436        );
437        assert!(matches!(outcome, CachedPiProbeOutcome::Hit(_)));
438    }
439
440    #[test]
441    fn stale_entry_returns_stale_result() {
442        let temp = TempDir::new().unwrap();
443        let path = cache_file(&temp);
444        write_entry(&path, &entry(1, Some(compatible_result())));
445
446        let outcome = probe_cached_impl(
447            false,
448            crate::models::probes::ProbeRefreshMode::Background,
449            &Some(path),
450            incompatible_result,
451            || Ok(()),
452        );
453        assert!(matches!(outcome, CachedPiProbeOutcome::Stale(_)));
454    }
455
456    #[test]
457    fn missing_cache_runs_probe() {
458        let temp = TempDir::new().unwrap();
459        let path = cache_file(&temp);
460        let outcome = probe_cached_impl(
461            false,
462            crate::models::probes::ProbeRefreshMode::Background,
463            &Some(path),
464            compatible_result,
465            || Ok(()),
466        );
467        assert!(matches!(outcome, CachedPiProbeOutcome::Miss(_)));
468    }
469}