Skip to main content

mars_agents/models/probes/
pi_cache.rs

1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use serde::{Deserialize, Serialize};
6
7use super::pi::PiProbeResult;
8use crate::error::MarsError;
9
10const SCHEMA_VERSION: u32 = 2;
11const DEFAULT_TTL_SECS: u64 = 60;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PiProbeCacheEntry {
15    pub schema_version: u32,
16    pub harness: String,
17    pub fetched_at: u64,
18    pub last_attempt_at: u64,
19    pub last_error: Option<String>,
20    pub result: Option<PiProbeResult>,
21}
22
23#[derive(Debug, Clone)]
24pub enum CachedPiProbeOutcome {
25    Hit(PiProbeResult),
26    Stale(PiProbeResult),
27    Miss(PiProbeResult),
28    Unavailable,
29}
30
31impl CachedPiProbeOutcome {
32    pub fn result(&self) -> Option<&PiProbeResult> {
33        match self {
34            Self::Hit(r) | Self::Stale(r) | Self::Miss(r) => Some(r),
35            Self::Unavailable => None,
36        }
37    }
38
39    pub fn cache_status(&self) -> &'static str {
40        match self {
41            Self::Hit(_) => "hit",
42            Self::Stale(_) => "stale",
43            Self::Miss(_) => "miss",
44            Self::Unavailable => "skipped",
45        }
46    }
47}
48
49fn should_probe_pi(installed: &HashSet<String>, is_offline: bool) -> bool {
50    !is_offline && installed.contains("pi")
51}
52
53fn cache_dir() -> Result<PathBuf, MarsError> {
54    let root = crate::platform::cache::global_cache_root()?;
55    Ok(root.join("availability"))
56}
57
58fn cache_path() -> Result<PathBuf, MarsError> {
59    Ok(cache_dir()?.join("pi.json"))
60}
61
62fn lock_path() -> Result<PathBuf, MarsError> {
63    Ok(cache_dir()?.join(".pi.lock"))
64}
65
66fn ttl_secs() -> u64 {
67    std::env::var("MARS_PROBE_CACHE_TTL_SECS")
68        .ok()
69        .and_then(|v| v.parse::<u64>().ok())
70        .unwrap_or(DEFAULT_TTL_SECS)
71}
72
73fn now_unix_secs() -> u64 {
74    std::time::SystemTime::now()
75        .duration_since(std::time::UNIX_EPOCH)
76        .unwrap_or_default()
77        .as_secs()
78}
79
80fn is_fresh(entry: &PiProbeCacheEntry) -> bool {
81    let ttl = ttl_secs();
82    let now = now_unix_secs();
83    if entry.fetched_at > now {
84        return false;
85    }
86    (now - entry.fetched_at) < ttl
87}
88
89fn read_cache_tolerant() -> Option<PiProbeCacheEntry> {
90    read_cache_tolerant_at(&cache_path().ok()?)
91}
92
93fn read_cache_tolerant_at(path: &Path) -> Option<PiProbeCacheEntry> {
94    let content = std::fs::read_to_string(path).ok()?;
95    let entry: PiProbeCacheEntry = serde_json::from_str(&content).ok()?;
96    if entry.schema_version != SCHEMA_VERSION {
97        return None;
98    }
99    if !entry.harness.eq_ignore_ascii_case("pi") {
100        return None;
101    }
102    Some(entry)
103}
104
105fn write_cache_at(path: &Path, entry: &PiProbeCacheEntry) -> Result<(), MarsError> {
106    let json = serde_json::to_string_pretty(entry)
107        .map_err(|e| MarsError::Internal(format!("pi probe cache serialize: {e}")))?;
108    crate::fs::atomic_write(path, json.as_bytes())
109}
110
111struct FileLock {
112    _file: std::fs::File,
113}
114
115fn try_lock() -> Option<FileLock> {
116    lock_at(&lock_path().ok()?, true)
117}
118
119fn blocking_lock() -> Option<FileLock> {
120    lock_at(&lock_path().ok()?, false)
121}
122
123fn lock_at(path: &Path, nonblocking: bool) -> Option<FileLock> {
124    if let Some(parent) = path.parent() {
125        std::fs::create_dir_all(parent).ok()?;
126    }
127    let file = std::fs::OpenOptions::new()
128        .create(true)
129        .write(true)
130        .truncate(false)
131        .open(path)
132        .ok()?;
133
134    #[cfg(unix)]
135    {
136        use std::os::unix::io::AsRawFd;
137        let flags = if nonblocking {
138            libc::LOCK_EX | libc::LOCK_NB
139        } else {
140            libc::LOCK_EX
141        };
142        let ret = unsafe { libc::flock(file.as_raw_fd(), flags) };
143        if ret != 0 {
144            return None;
145        }
146    }
147
148    #[cfg(windows)]
149    {
150        use std::os::windows::io::AsRawHandle;
151        use windows_sys::Win32::Foundation::HANDLE;
152        use windows_sys::Win32::Storage::FileSystem::{
153            LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
154        };
155        let handle = file.as_raw_handle() as HANDLE;
156        let mut overlapped = unsafe { std::mem::zeroed() };
157        let flags = if nonblocking {
158            LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY
159        } else {
160            LOCKFILE_EXCLUSIVE_LOCK
161        };
162        let ret = unsafe { LockFileEx(handle, flags, 0, 1, 0, &mut overlapped) };
163        if ret == 0 {
164            return None;
165        }
166    }
167
168    Some(FileLock { _file: file })
169}
170
171pub fn probe_cached(installed: &HashSet<String>, is_offline: bool) -> CachedPiProbeOutcome {
172    if !should_probe_pi(installed, is_offline) {
173        return CachedPiProbeOutcome::Unavailable;
174    }
175
176    probe_cached_impl(is_offline, &cache_path().ok(), super::pi::probe, || {
177        spawn_detached_refresh().map_err(|_| ())
178    })
179}
180
181fn probe_cached_impl<F, S>(
182    is_offline: bool,
183    path: &Option<PathBuf>,
184    probe: F,
185    spawn_refresh: S,
186) -> CachedPiProbeOutcome
187where
188    F: Fn() -> PiProbeResult,
189    S: Fn() -> Result<(), ()>,
190{
191    let cached = path.as_deref().and_then(read_cache_tolerant_at);
192
193    match cached {
194        Some(entry) if is_fresh(&entry) && is_usable_result(entry.result.as_ref()) => {
195            CachedPiProbeOutcome::Hit(entry.result.unwrap())
196        }
197        Some(entry) if is_usable_result(entry.result.as_ref()) => {
198            let result = entry.result.clone().unwrap();
199            if !is_offline {
200                trigger_background_refresh_with(spawn_refresh);
201            }
202            CachedPiProbeOutcome::Stale(result)
203        }
204        _ if is_offline => CachedPiProbeOutcome::Unavailable,
205        _ => synchronous_probe_with(path, probe),
206    }
207}
208
209fn trigger_background_refresh_with<S>(spawn_refresh: S)
210where
211    S: Fn() -> Result<(), ()>,
212{
213    let Some(lock) = try_lock() else { return };
214    if let Some(entry) = read_cache_tolerant()
215        && is_fresh(&entry)
216        && is_usable_result(entry.result.as_ref())
217    {
218        drop(lock);
219        return;
220    }
221    let _ = spawn_refresh();
222    drop(lock);
223}
224
225fn synchronous_probe_with<F>(path: &Option<PathBuf>, probe: F) -> CachedPiProbeOutcome
226where
227    F: Fn() -> PiProbeResult,
228{
229    let lock = blocking_lock();
230
231    if lock.is_some()
232        && let Some(path) = path
233        && let Some(entry) = read_cache_tolerant_at(path)
234        && is_usable_result(entry.result.as_ref())
235    {
236        if is_fresh(&entry) {
237            return CachedPiProbeOutcome::Hit(entry.result.unwrap());
238        }
239
240        let probe_result = probe();
241        write_probe_attempt(path, probe_result.clone());
242        return CachedPiProbeOutcome::Miss(probe_result);
243    }
244
245    let probe_result = probe();
246    if let Some(path) = path {
247        write_probe_attempt(path, probe_result.clone());
248    }
249    drop(lock);
250
251    CachedPiProbeOutcome::Miss(probe_result)
252}
253
254fn write_probe_attempt(path: &Path, probe_result: PiProbeResult) {
255    let now = now_unix_secs();
256    let entry = PiProbeCacheEntry {
257        schema_version: SCHEMA_VERSION,
258        harness: "pi".to_string(),
259        fetched_at: now,
260        last_attempt_at: now,
261        last_error: probe_result.error.clone(),
262        result: Some(probe_result),
263    };
264
265    if let Err(e) = write_cache_at(path, &entry) {
266        eprintln!("debug: pi probe cache write failed: {e}");
267    }
268}
269
270fn spawn_detached_refresh() -> std::io::Result<()> {
271    let mars_bin = std::env::current_exe()?;
272    let mut cmd = std::process::Command::new(mars_bin);
273    cmd.args(["models", "__refresh-probe", "--target", "pi"]);
274    cmd.stdin(Stdio::null());
275    cmd.stdout(Stdio::null());
276    cmd.stderr(Stdio::null());
277
278    #[cfg(unix)]
279    {
280        use std::os::unix::process::CommandExt;
281        unsafe {
282            cmd.pre_exec(|| {
283                libc::setsid();
284                Ok(())
285            });
286        }
287    }
288
289    #[cfg(windows)]
290    {
291        use std::os::windows::process::CommandExt;
292        cmd.creation_flags(0x00000008);
293    }
294
295    cmd.spawn()?;
296    Ok(())
297}
298
299pub fn run_refresh_probe_command() -> Result<i32, MarsError> {
300    let Some(_lock) = blocking_lock() else {
301        return Ok(0);
302    };
303
304    if let Some(entry) = read_cache_tolerant()
305        && is_fresh(&entry)
306        && is_usable_result(entry.result.as_ref())
307    {
308        return Ok(0);
309    }
310
311    let probe_result = super::pi::probe();
312    if let Ok(path) = cache_path() {
313        write_probe_attempt(&path, probe_result);
314    }
315
316    Ok(0)
317}
318
319fn is_usable_result(result: Option<&PiProbeResult>) -> bool {
320    result.is_some_and(|probe| probe.error.is_none())
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use tempfile::TempDir;
327
328    fn compatible_result() -> PiProbeResult {
329        PiProbeResult {
330            binary_path: "/tmp/pi".to_string(),
331            version: Some("pi 0.4.2".to_string()),
332            compatible: true,
333            help_surface_tokens_present: vec!["--mode".to_string()],
334            help_surface_tokens_missing: Vec::new(),
335            model_slugs: HashSet::from(["openai/gpt-5.4".to_string()]),
336            error: None,
337        }
338    }
339
340    fn incompatible_result() -> PiProbeResult {
341        PiProbeResult {
342            compatible: false,
343            help_surface_tokens_missing: vec!["--mode".to_string()],
344            error: Some("missing help tokens".to_string()),
345            ..PiProbeResult::default()
346        }
347    }
348
349    fn entry(fetched_at: u64, result: Option<PiProbeResult>) -> PiProbeCacheEntry {
350        PiProbeCacheEntry {
351            schema_version: SCHEMA_VERSION,
352            harness: "pi".to_string(),
353            fetched_at,
354            last_attempt_at: fetched_at,
355            last_error: None,
356            result,
357        }
358    }
359
360    fn cache_file(temp: &TempDir) -> PathBuf {
361        temp.path().join("availability").join("pi.json")
362    }
363
364    fn write_entry(path: &Path, entry: &PiProbeCacheEntry) {
365        write_cache_at(path, entry).unwrap();
366    }
367
368    #[test]
369    fn legacy_v1_cache_without_model_slugs_is_reprobed() {
370        let temp = TempDir::new().unwrap();
371        let path = cache_file(&temp);
372        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
373        std::fs::write(
374            &path,
375            serde_json::json!({
376                "schema_version": 1,
377                "harness": "pi",
378                "fetched_at": now_unix_secs(),
379                "last_attempt_at": now_unix_secs(),
380                "last_error": null,
381                "result": {
382                    "binary_path": "/tmp/pi",
383                    "version": "pi 0.4.2",
384                    "compatible": true,
385                    "help_surface_tokens_present": ["--mode"],
386                    "help_surface_tokens_missing": [],
387                    "error": null
388                }
389            })
390            .to_string(),
391        )
392        .unwrap();
393
394        let outcome = probe_cached_impl(
395            false,
396            &Some(path),
397            || PiProbeResult {
398                model_slugs: HashSet::from(["openai/gpt-5.5".to_string()]),
399                ..compatible_result()
400            },
401            || Ok(()),
402        );
403
404        let CachedPiProbeOutcome::Miss(result) = outcome else {
405            panic!("legacy cache entries without model slug capability must trigger a fresh probe");
406        };
407        assert!(result.model_slugs.contains("openai/gpt-5.5"));
408    }
409
410    #[test]
411    fn fresh_hit_returns_cached_result() {
412        let temp = TempDir::new().unwrap();
413        let path = cache_file(&temp);
414        write_entry(&path, &entry(now_unix_secs(), Some(compatible_result())));
415
416        let outcome = probe_cached_impl(false, &Some(path), incompatible_result, || Ok(()));
417        assert!(matches!(outcome, CachedPiProbeOutcome::Hit(_)));
418    }
419
420    #[test]
421    fn stale_entry_returns_stale_result() {
422        let temp = TempDir::new().unwrap();
423        let path = cache_file(&temp);
424        write_entry(&path, &entry(1, Some(compatible_result())));
425
426        let outcome = probe_cached_impl(false, &Some(path), incompatible_result, || Ok(()));
427        assert!(matches!(outcome, CachedPiProbeOutcome::Stale(_)));
428    }
429
430    #[test]
431    fn missing_cache_runs_probe() {
432        let temp = TempDir::new().unwrap();
433        let path = cache_file(&temp);
434        let outcome = probe_cached_impl(false, &Some(path), compatible_result, || Ok(()));
435        assert!(matches!(outcome, CachedPiProbeOutcome::Miss(_)));
436    }
437}