mars_agents/models/probes/
pi_cache.rs1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use serde::{Deserialize, Serialize};
6
7use super::pi::PiProbeResult;
8use super::probe_refresh::ProbeCacheBranch;
9use crate::error::MarsError;
10
11const SCHEMA_VERSION: u32 = 2;
12const DEFAULT_TTL_SECS: u64 = 60;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct PiProbeCacheEntry {
16 pub schema_version: u32,
17 pub harness: String,
18 pub fetched_at: u64,
19 pub last_attempt_at: u64,
20 pub last_error: Option<String>,
21 pub result: Option<PiProbeResult>,
22}
23
24#[derive(Debug, Clone)]
25pub enum CachedPiProbeOutcome {
26 Hit(PiProbeResult),
27 Stale(PiProbeResult),
28 Miss(PiProbeResult),
29 Unavailable,
30}
31
32impl CachedPiProbeOutcome {
33 pub fn result(&self) -> Option<&PiProbeResult> {
34 match self {
35 Self::Hit(r) | Self::Stale(r) | Self::Miss(r) => Some(r),
36 Self::Unavailable => None,
37 }
38 }
39
40 pub fn cache_status(&self) -> &'static str {
41 match self {
42 Self::Hit(_) => "hit",
43 Self::Stale(_) => "stale",
44 Self::Miss(_) => "miss",
45 Self::Unavailable => "skipped",
46 }
47 }
48}
49
50fn should_probe_pi(installed: &HashSet<String>, is_offline: bool) -> bool {
51 !is_offline && installed.contains("pi")
52}
53
54fn cache_dir() -> Result<PathBuf, MarsError> {
55 let root = crate::platform::cache::global_cache_root()?;
56 Ok(root.join("availability"))
57}
58
59fn cache_path() -> Result<PathBuf, MarsError> {
60 Ok(cache_dir()?.join("pi.json"))
61}
62
63fn lock_path() -> Result<PathBuf, MarsError> {
64 Ok(cache_dir()?.join(".pi.lock"))
65}
66
67fn ttl_secs() -> u64 {
68 std::env::var("MARS_PROBE_CACHE_TTL_SECS")
69 .ok()
70 .and_then(|v| v.parse::<u64>().ok())
71 .unwrap_or(DEFAULT_TTL_SECS)
72}
73
74fn now_unix_secs() -> u64 {
75 std::time::SystemTime::now()
76 .duration_since(std::time::UNIX_EPOCH)
77 .unwrap_or_default()
78 .as_secs()
79}
80
81fn is_fresh(entry: &PiProbeCacheEntry) -> bool {
82 let ttl = ttl_secs();
83 let now = now_unix_secs();
84 if entry.fetched_at > now {
85 return false;
86 }
87 (now - entry.fetched_at) < ttl
88}
89
90fn read_cache_tolerant() -> Option<PiProbeCacheEntry> {
91 read_cache_tolerant_at(&cache_path().ok()?)
92}
93
94fn read_cache_tolerant_at(path: &Path) -> Option<PiProbeCacheEntry> {
95 let content = std::fs::read_to_string(path).ok()?;
96 let entry: PiProbeCacheEntry = serde_json::from_str(&content).ok()?;
97 if entry.schema_version != SCHEMA_VERSION {
98 return None;
99 }
100 if !entry.harness.eq_ignore_ascii_case("pi") {
101 return None;
102 }
103 Some(entry)
104}
105
106fn write_cache_at(path: &Path, entry: &PiProbeCacheEntry) -> Result<(), MarsError> {
107 let json = serde_json::to_string_pretty(entry)
108 .map_err(|e| MarsError::Internal(format!("pi probe cache serialize: {e}")))?;
109 crate::fs::atomic_write(path, json.as_bytes())
110}
111
112struct FileLock {
113 _file: std::fs::File,
114}
115
116fn try_lock() -> Option<FileLock> {
117 lock_at(&lock_path().ok()?, true)
118}
119
120fn blocking_lock() -> Option<FileLock> {
121 lock_at(&lock_path().ok()?, false)
122}
123
124fn lock_at(path: &Path, nonblocking: bool) -> Option<FileLock> {
125 if let Some(parent) = path.parent() {
126 std::fs::create_dir_all(parent).ok()?;
127 }
128 let file = std::fs::OpenOptions::new()
129 .create(true)
130 .write(true)
131 .truncate(false)
132 .open(path)
133 .ok()?;
134
135 #[cfg(unix)]
136 {
137 use std::os::unix::io::AsRawFd;
138 let flags = if nonblocking {
139 libc::LOCK_EX | libc::LOCK_NB
140 } else {
141 libc::LOCK_EX
142 };
143 let ret = unsafe { libc::flock(file.as_raw_fd(), flags) };
144 if ret != 0 {
145 return None;
146 }
147 }
148
149 #[cfg(windows)]
150 {
151 use std::os::windows::io::AsRawHandle;
152 use windows_sys::Win32::Foundation::HANDLE;
153 use windows_sys::Win32::Storage::FileSystem::{
154 LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
155 };
156 let handle = file.as_raw_handle() as HANDLE;
157 let mut overlapped = unsafe { std::mem::zeroed() };
158 let flags = if nonblocking {
159 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY
160 } else {
161 LOCKFILE_EXCLUSIVE_LOCK
162 };
163 let ret = unsafe { LockFileEx(handle, flags, 0, 1, 0, &mut overlapped) };
164 if ret == 0 {
165 return None;
166 }
167 }
168
169 Some(FileLock { _file: file })
170}
171
172pub fn probe_cached(
173 installed: &HashSet<String>,
174 mars_offline: bool,
175 probe_refresh: super::ProbeRefreshMode,
176) -> CachedPiProbeOutcome {
177 if !should_probe_pi(installed, mars_offline) {
178 return CachedPiProbeOutcome::Unavailable;
179 }
180
181 probe_cached_impl(
182 mars_offline,
183 probe_refresh,
184 &cache_path().ok(),
185 super::pi::probe,
186 || spawn_detached_refresh().map_err(|_| ()),
187 )
188}
189
190fn probe_cached_impl<F, S>(
191 mars_offline: bool,
192 probe_refresh: super::ProbeRefreshMode,
193 path: &Option<PathBuf>,
194 probe: F,
195 spawn_refresh: S,
196) -> CachedPiProbeOutcome
197where
198 F: Fn() -> PiProbeResult,
199 S: Fn() -> Result<(), ()>,
200{
201 let cached = path.as_deref().and_then(read_cache_tolerant_at);
202 match super::probe_refresh::resolve_probe_cache_branch(
203 cached,
204 mars_offline,
205 probe_refresh,
206 |entry| {
207 entry
208 .result
209 .as_ref()
210 .filter(|result| is_usable_result(Some(*result)))
211 },
212 is_fresh,
213 || trigger_background_refresh_with(spawn_refresh),
214 ) {
215 ProbeCacheBranch::Hit(result) => CachedPiProbeOutcome::Hit(result),
216 ProbeCacheBranch::Stale(result) => CachedPiProbeOutcome::Stale(result),
217 ProbeCacheBranch::Unavailable => CachedPiProbeOutcome::Unavailable,
218 ProbeCacheBranch::SynchronousProbe => synchronous_probe_with(path, probe),
219 }
220}
221
222fn trigger_background_refresh_with<S>(spawn_refresh: S)
223where
224 S: Fn() -> Result<(), ()>,
225{
226 let Some(lock) = try_lock() else { return };
227 if let Some(entry) = read_cache_tolerant()
228 && is_fresh(&entry)
229 && is_usable_result(entry.result.as_ref())
230 {
231 drop(lock);
232 return;
233 }
234 let _ = spawn_refresh();
235 drop(lock);
236}
237
238fn synchronous_probe_with<F>(path: &Option<PathBuf>, probe: F) -> CachedPiProbeOutcome
239where
240 F: Fn() -> PiProbeResult,
241{
242 let lock = blocking_lock();
243
244 if lock.is_some()
245 && let Some(path) = path
246 && let Some(entry) = read_cache_tolerant_at(path)
247 && is_usable_result(entry.result.as_ref())
248 {
249 if is_fresh(&entry) {
250 return CachedPiProbeOutcome::Hit(entry.result.unwrap());
251 }
252
253 let probe_result = probe();
254 write_probe_attempt(path, probe_result.clone());
255 return CachedPiProbeOutcome::Miss(probe_result);
256 }
257
258 let probe_result = probe();
259 if let Some(path) = path {
260 write_probe_attempt(path, probe_result.clone());
261 }
262 drop(lock);
263
264 CachedPiProbeOutcome::Miss(probe_result)
265}
266
267fn write_probe_attempt(path: &Path, probe_result: PiProbeResult) {
268 let now = now_unix_secs();
269 let entry = PiProbeCacheEntry {
270 schema_version: SCHEMA_VERSION,
271 harness: "pi".to_string(),
272 fetched_at: now,
273 last_attempt_at: now,
274 last_error: probe_result.error.clone(),
275 result: Some(probe_result),
276 };
277
278 if let Err(e) = write_cache_at(path, &entry) {
279 eprintln!("debug: pi probe cache write failed: {e}");
280 }
281}
282
283fn spawn_detached_refresh() -> std::io::Result<()> {
284 let mars_bin = std::env::current_exe()?;
285 let mut cmd = std::process::Command::new(mars_bin);
286 cmd.args(["models", "__refresh-probe", "--target", "pi"]);
287 cmd.stdin(Stdio::null());
288 cmd.stdout(Stdio::null());
289 cmd.stderr(Stdio::null());
290
291 #[cfg(unix)]
292 {
293 use std::os::unix::process::CommandExt;
294 unsafe {
295 cmd.pre_exec(|| {
296 libc::setsid();
297 Ok(())
298 });
299 }
300 }
301
302 #[cfg(windows)]
303 {
304 use std::os::windows::process::CommandExt;
305 cmd.creation_flags(0x00000008);
306 }
307
308 cmd.spawn()?;
309 Ok(())
310}
311
312pub fn run_refresh_probe_command() -> Result<i32, MarsError> {
313 let Some(_lock) = blocking_lock() else {
314 return Ok(0);
315 };
316
317 if let Some(entry) = read_cache_tolerant()
318 && is_fresh(&entry)
319 && is_usable_result(entry.result.as_ref())
320 {
321 return Ok(0);
322 }
323
324 let probe_result = super::pi::probe();
325 if let Ok(path) = cache_path() {
326 write_probe_attempt(&path, probe_result);
327 }
328
329 Ok(0)
330}
331
332fn is_usable_result(result: Option<&PiProbeResult>) -> bool {
333 result.is_some_and(|probe| probe.error.is_none())
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339 use tempfile::TempDir;
340
341 fn compatible_result() -> PiProbeResult {
342 PiProbeResult {
343 binary_path: "/tmp/pi".to_string(),
344 version: Some("pi 0.4.2".to_string()),
345 compatible: true,
346 help_surface_tokens_present: vec!["--mode".to_string()],
347 help_surface_tokens_missing: Vec::new(),
348 model_slugs: HashSet::from(["openai/gpt-5.4".to_string()]),
349 error: None,
350 }
351 }
352
353 fn incompatible_result() -> PiProbeResult {
354 PiProbeResult {
355 compatible: false,
356 help_surface_tokens_missing: vec!["--mode".to_string()],
357 error: Some("missing help tokens".to_string()),
358 ..PiProbeResult::default()
359 }
360 }
361
362 fn entry(fetched_at: u64, result: Option<PiProbeResult>) -> PiProbeCacheEntry {
363 PiProbeCacheEntry {
364 schema_version: SCHEMA_VERSION,
365 harness: "pi".to_string(),
366 fetched_at,
367 last_attempt_at: fetched_at,
368 last_error: None,
369 result,
370 }
371 }
372
373 fn cache_file(temp: &TempDir) -> PathBuf {
374 temp.path().join("availability").join("pi.json")
375 }
376
377 fn write_entry(path: &Path, entry: &PiProbeCacheEntry) {
378 write_cache_at(path, entry).unwrap();
379 }
380
381 #[test]
382 fn legacy_v1_cache_without_model_slugs_is_reprobed() {
383 let temp = TempDir::new().unwrap();
384 let path = cache_file(&temp);
385 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
386 std::fs::write(
387 &path,
388 serde_json::json!({
389 "schema_version": 1,
390 "harness": "pi",
391 "fetched_at": now_unix_secs(),
392 "last_attempt_at": now_unix_secs(),
393 "last_error": null,
394 "result": {
395 "binary_path": "/tmp/pi",
396 "version": "pi 0.4.2",
397 "compatible": true,
398 "help_surface_tokens_present": ["--mode"],
399 "help_surface_tokens_missing": [],
400 "error": null
401 }
402 })
403 .to_string(),
404 )
405 .unwrap();
406
407 let outcome = probe_cached_impl(
408 false,
409 crate::models::probes::ProbeRefreshMode::Background,
410 &Some(path),
411 || PiProbeResult {
412 model_slugs: HashSet::from(["openai/gpt-5.5".to_string()]),
413 ..compatible_result()
414 },
415 || Ok(()),
416 );
417
418 let CachedPiProbeOutcome::Miss(result) = outcome else {
419 panic!("legacy cache entries without model slug capability must trigger a fresh probe");
420 };
421 assert!(result.model_slugs.contains("openai/gpt-5.5"));
422 }
423
424 #[test]
425 fn fresh_hit_returns_cached_result() {
426 let temp = TempDir::new().unwrap();
427 let path = cache_file(&temp);
428 write_entry(&path, &entry(now_unix_secs(), Some(compatible_result())));
429
430 let outcome = probe_cached_impl(
431 false,
432 crate::models::probes::ProbeRefreshMode::Background,
433 &Some(path),
434 incompatible_result,
435 || Ok(()),
436 );
437 assert!(matches!(outcome, CachedPiProbeOutcome::Hit(_)));
438 }
439
440 #[test]
441 fn stale_entry_returns_stale_result() {
442 let temp = TempDir::new().unwrap();
443 let path = cache_file(&temp);
444 write_entry(&path, &entry(1, Some(compatible_result())));
445
446 let outcome = probe_cached_impl(
447 false,
448 crate::models::probes::ProbeRefreshMode::Background,
449 &Some(path),
450 incompatible_result,
451 || Ok(()),
452 );
453 assert!(matches!(outcome, CachedPiProbeOutcome::Stale(_)));
454 }
455
456 #[test]
457 fn missing_cache_runs_probe() {
458 let temp = TempDir::new().unwrap();
459 let path = cache_file(&temp);
460 let outcome = probe_cached_impl(
461 false,
462 crate::models::probes::ProbeRefreshMode::Background,
463 &Some(path),
464 compatible_result,
465 || Ok(()),
466 );
467 assert!(matches!(outcome, CachedPiProbeOutcome::Miss(_)));
468 }
469}