mars_agents/models/probes/
pi_cache.rs1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use serde::{Deserialize, Serialize};
6
7use super::pi::PiProbeResult;
8use crate::error::MarsError;
9
10const SCHEMA_VERSION: u32 = 2;
11const DEFAULT_TTL_SECS: u64 = 60;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct PiProbeCacheEntry {
15 pub schema_version: u32,
16 pub harness: String,
17 pub fetched_at: u64,
18 pub last_attempt_at: u64,
19 pub last_error: Option<String>,
20 pub result: Option<PiProbeResult>,
21}
22
23#[derive(Debug, Clone)]
24pub enum CachedPiProbeOutcome {
25 Hit(PiProbeResult),
26 Stale(PiProbeResult),
27 Miss(PiProbeResult),
28 Unavailable,
29}
30
31impl CachedPiProbeOutcome {
32 pub fn result(&self) -> Option<&PiProbeResult> {
33 match self {
34 Self::Hit(r) | Self::Stale(r) | Self::Miss(r) => Some(r),
35 Self::Unavailable => None,
36 }
37 }
38
39 pub fn cache_status(&self) -> &'static str {
40 match self {
41 Self::Hit(_) => "hit",
42 Self::Stale(_) => "stale",
43 Self::Miss(_) => "miss",
44 Self::Unavailable => "skipped",
45 }
46 }
47}
48
49fn should_probe_pi(installed: &HashSet<String>, is_offline: bool) -> bool {
50 !is_offline && installed.contains("pi")
51}
52
53fn cache_dir() -> Result<PathBuf, MarsError> {
54 let root = crate::platform::cache::global_cache_root()?;
55 Ok(root.join("availability"))
56}
57
58fn cache_path() -> Result<PathBuf, MarsError> {
59 Ok(cache_dir()?.join("pi.json"))
60}
61
62fn lock_path() -> Result<PathBuf, MarsError> {
63 Ok(cache_dir()?.join(".pi.lock"))
64}
65
66fn ttl_secs() -> u64 {
67 std::env::var("MARS_PROBE_CACHE_TTL_SECS")
68 .ok()
69 .and_then(|v| v.parse::<u64>().ok())
70 .unwrap_or(DEFAULT_TTL_SECS)
71}
72
73fn now_unix_secs() -> u64 {
74 std::time::SystemTime::now()
75 .duration_since(std::time::UNIX_EPOCH)
76 .unwrap_or_default()
77 .as_secs()
78}
79
80fn is_fresh(entry: &PiProbeCacheEntry) -> bool {
81 let ttl = ttl_secs();
82 let now = now_unix_secs();
83 if entry.fetched_at > now {
84 return false;
85 }
86 (now - entry.fetched_at) < ttl
87}
88
89fn read_cache_tolerant() -> Option<PiProbeCacheEntry> {
90 read_cache_tolerant_at(&cache_path().ok()?)
91}
92
93fn read_cache_tolerant_at(path: &Path) -> Option<PiProbeCacheEntry> {
94 let content = std::fs::read_to_string(path).ok()?;
95 let entry: PiProbeCacheEntry = serde_json::from_str(&content).ok()?;
96 if entry.schema_version != SCHEMA_VERSION {
97 return None;
98 }
99 if !entry.harness.eq_ignore_ascii_case("pi") {
100 return None;
101 }
102 Some(entry)
103}
104
105fn write_cache_at(path: &Path, entry: &PiProbeCacheEntry) -> Result<(), MarsError> {
106 let json = serde_json::to_string_pretty(entry)
107 .map_err(|e| MarsError::Internal(format!("pi probe cache serialize: {e}")))?;
108 crate::fs::atomic_write(path, json.as_bytes())
109}
110
111struct FileLock {
112 _file: std::fs::File,
113}
114
115fn try_lock() -> Option<FileLock> {
116 lock_at(&lock_path().ok()?, true)
117}
118
119fn blocking_lock() -> Option<FileLock> {
120 lock_at(&lock_path().ok()?, false)
121}
122
123fn lock_at(path: &Path, nonblocking: bool) -> Option<FileLock> {
124 if let Some(parent) = path.parent() {
125 std::fs::create_dir_all(parent).ok()?;
126 }
127 let file = std::fs::OpenOptions::new()
128 .create(true)
129 .write(true)
130 .truncate(false)
131 .open(path)
132 .ok()?;
133
134 #[cfg(unix)]
135 {
136 use std::os::unix::io::AsRawFd;
137 let flags = if nonblocking {
138 libc::LOCK_EX | libc::LOCK_NB
139 } else {
140 libc::LOCK_EX
141 };
142 let ret = unsafe { libc::flock(file.as_raw_fd(), flags) };
143 if ret != 0 {
144 return None;
145 }
146 }
147
148 #[cfg(windows)]
149 {
150 use std::os::windows::io::AsRawHandle;
151 use windows_sys::Win32::Foundation::HANDLE;
152 use windows_sys::Win32::Storage::FileSystem::{
153 LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
154 };
155 let handle = file.as_raw_handle() as HANDLE;
156 let mut overlapped = unsafe { std::mem::zeroed() };
157 let flags = if nonblocking {
158 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY
159 } else {
160 LOCKFILE_EXCLUSIVE_LOCK
161 };
162 let ret = unsafe { LockFileEx(handle, flags, 0, 1, 0, &mut overlapped) };
163 if ret == 0 {
164 return None;
165 }
166 }
167
168 Some(FileLock { _file: file })
169}
170
171pub fn probe_cached(installed: &HashSet<String>, is_offline: bool) -> CachedPiProbeOutcome {
172 if !should_probe_pi(installed, is_offline) {
173 return CachedPiProbeOutcome::Unavailable;
174 }
175
176 probe_cached_impl(is_offline, &cache_path().ok(), super::pi::probe, || {
177 spawn_detached_refresh().map_err(|_| ())
178 })
179}
180
181fn probe_cached_impl<F, S>(
182 is_offline: bool,
183 path: &Option<PathBuf>,
184 probe: F,
185 spawn_refresh: S,
186) -> CachedPiProbeOutcome
187where
188 F: Fn() -> PiProbeResult,
189 S: Fn() -> Result<(), ()>,
190{
191 let cached = path.as_deref().and_then(read_cache_tolerant_at);
192
193 match cached {
194 Some(entry) if is_fresh(&entry) && is_usable_result(entry.result.as_ref()) => {
195 CachedPiProbeOutcome::Hit(entry.result.unwrap())
196 }
197 Some(entry) if is_usable_result(entry.result.as_ref()) => {
198 let result = entry.result.clone().unwrap();
199 if !is_offline {
200 trigger_background_refresh_with(spawn_refresh);
201 }
202 CachedPiProbeOutcome::Stale(result)
203 }
204 _ if is_offline => CachedPiProbeOutcome::Unavailable,
205 _ => synchronous_probe_with(path, probe),
206 }
207}
208
209fn trigger_background_refresh_with<S>(spawn_refresh: S)
210where
211 S: Fn() -> Result<(), ()>,
212{
213 let Some(lock) = try_lock() else { return };
214 if let Some(entry) = read_cache_tolerant()
215 && is_fresh(&entry)
216 && is_usable_result(entry.result.as_ref())
217 {
218 drop(lock);
219 return;
220 }
221 let _ = spawn_refresh();
222 drop(lock);
223}
224
225fn synchronous_probe_with<F>(path: &Option<PathBuf>, probe: F) -> CachedPiProbeOutcome
226where
227 F: Fn() -> PiProbeResult,
228{
229 let lock = blocking_lock();
230
231 if lock.is_some()
232 && let Some(path) = path
233 && let Some(entry) = read_cache_tolerant_at(path)
234 && is_usable_result(entry.result.as_ref())
235 {
236 if is_fresh(&entry) {
237 return CachedPiProbeOutcome::Hit(entry.result.unwrap());
238 }
239
240 let probe_result = probe();
241 write_probe_attempt(path, probe_result.clone());
242 return CachedPiProbeOutcome::Miss(probe_result);
243 }
244
245 let probe_result = probe();
246 if let Some(path) = path {
247 write_probe_attempt(path, probe_result.clone());
248 }
249 drop(lock);
250
251 CachedPiProbeOutcome::Miss(probe_result)
252}
253
254fn write_probe_attempt(path: &Path, probe_result: PiProbeResult) {
255 let now = now_unix_secs();
256 let entry = PiProbeCacheEntry {
257 schema_version: SCHEMA_VERSION,
258 harness: "pi".to_string(),
259 fetched_at: now,
260 last_attempt_at: now,
261 last_error: probe_result.error.clone(),
262 result: Some(probe_result),
263 };
264
265 if let Err(e) = write_cache_at(path, &entry) {
266 eprintln!("debug: pi probe cache write failed: {e}");
267 }
268}
269
270fn spawn_detached_refresh() -> std::io::Result<()> {
271 let mars_bin = std::env::current_exe()?;
272 let mut cmd = std::process::Command::new(mars_bin);
273 cmd.args(["models", "__refresh-probe", "--target", "pi"]);
274 cmd.stdin(Stdio::null());
275 cmd.stdout(Stdio::null());
276 cmd.stderr(Stdio::null());
277
278 #[cfg(unix)]
279 {
280 use std::os::unix::process::CommandExt;
281 unsafe {
282 cmd.pre_exec(|| {
283 libc::setsid();
284 Ok(())
285 });
286 }
287 }
288
289 #[cfg(windows)]
290 {
291 use std::os::windows::process::CommandExt;
292 cmd.creation_flags(0x00000008);
293 }
294
295 cmd.spawn()?;
296 Ok(())
297}
298
299pub fn run_refresh_probe_command() -> Result<i32, MarsError> {
300 let Some(_lock) = blocking_lock() else {
301 return Ok(0);
302 };
303
304 if let Some(entry) = read_cache_tolerant()
305 && is_fresh(&entry)
306 && is_usable_result(entry.result.as_ref())
307 {
308 return Ok(0);
309 }
310
311 let probe_result = super::pi::probe();
312 if let Ok(path) = cache_path() {
313 write_probe_attempt(&path, probe_result);
314 }
315
316 Ok(0)
317}
318
319fn is_usable_result(result: Option<&PiProbeResult>) -> bool {
320 result.is_some_and(|probe| probe.error.is_none())
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326 use tempfile::TempDir;
327
328 fn compatible_result() -> PiProbeResult {
329 PiProbeResult {
330 binary_path: "/tmp/pi".to_string(),
331 version: Some("pi 0.4.2".to_string()),
332 compatible: true,
333 help_surface_tokens_present: vec!["--mode".to_string()],
334 help_surface_tokens_missing: Vec::new(),
335 model_slugs: HashSet::from(["openai/gpt-5.4".to_string()]),
336 error: None,
337 }
338 }
339
340 fn incompatible_result() -> PiProbeResult {
341 PiProbeResult {
342 compatible: false,
343 help_surface_tokens_missing: vec!["--mode".to_string()],
344 error: Some("missing help tokens".to_string()),
345 ..PiProbeResult::default()
346 }
347 }
348
349 fn entry(fetched_at: u64, result: Option<PiProbeResult>) -> PiProbeCacheEntry {
350 PiProbeCacheEntry {
351 schema_version: SCHEMA_VERSION,
352 harness: "pi".to_string(),
353 fetched_at,
354 last_attempt_at: fetched_at,
355 last_error: None,
356 result,
357 }
358 }
359
360 fn cache_file(temp: &TempDir) -> PathBuf {
361 temp.path().join("availability").join("pi.json")
362 }
363
364 fn write_entry(path: &Path, entry: &PiProbeCacheEntry) {
365 write_cache_at(path, entry).unwrap();
366 }
367
368 #[test]
369 fn legacy_v1_cache_without_model_slugs_is_reprobed() {
370 let temp = TempDir::new().unwrap();
371 let path = cache_file(&temp);
372 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
373 std::fs::write(
374 &path,
375 serde_json::json!({
376 "schema_version": 1,
377 "harness": "pi",
378 "fetched_at": now_unix_secs(),
379 "last_attempt_at": now_unix_secs(),
380 "last_error": null,
381 "result": {
382 "binary_path": "/tmp/pi",
383 "version": "pi 0.4.2",
384 "compatible": true,
385 "help_surface_tokens_present": ["--mode"],
386 "help_surface_tokens_missing": [],
387 "error": null
388 }
389 })
390 .to_string(),
391 )
392 .unwrap();
393
394 let outcome = probe_cached_impl(
395 false,
396 &Some(path),
397 || PiProbeResult {
398 model_slugs: HashSet::from(["openai/gpt-5.5".to_string()]),
399 ..compatible_result()
400 },
401 || Ok(()),
402 );
403
404 let CachedPiProbeOutcome::Miss(result) = outcome else {
405 panic!("legacy cache entries without model slug capability must trigger a fresh probe");
406 };
407 assert!(result.model_slugs.contains("openai/gpt-5.5"));
408 }
409
410 #[test]
411 fn fresh_hit_returns_cached_result() {
412 let temp = TempDir::new().unwrap();
413 let path = cache_file(&temp);
414 write_entry(&path, &entry(now_unix_secs(), Some(compatible_result())));
415
416 let outcome = probe_cached_impl(false, &Some(path), incompatible_result, || Ok(()));
417 assert!(matches!(outcome, CachedPiProbeOutcome::Hit(_)));
418 }
419
420 #[test]
421 fn stale_entry_returns_stale_result() {
422 let temp = TempDir::new().unwrap();
423 let path = cache_file(&temp);
424 write_entry(&path, &entry(1, Some(compatible_result())));
425
426 let outcome = probe_cached_impl(false, &Some(path), incompatible_result, || Ok(()));
427 assert!(matches!(outcome, CachedPiProbeOutcome::Stale(_)));
428 }
429
430 #[test]
431 fn missing_cache_runs_probe() {
432 let temp = TempDir::new().unwrap();
433 let path = cache_file(&temp);
434 let outcome = probe_cached_impl(false, &Some(path), compatible_result, || Ok(()));
435 assert!(matches!(outcome, CachedPiProbeOutcome::Miss(_)));
436 }
437}