mars_agents/models/probes/
cursor_cache.rs1use std::collections::HashSet;
2use std::path::{Path, PathBuf};
3use std::process::Stdio;
4
5use serde::{Deserialize, Serialize};
6
7use super::cursor::CursorProbeResult;
8use crate::error::MarsError;
9
10const SCHEMA_VERSION: u32 = 1;
11const DEFAULT_TTL_SECS: u64 = 60;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ProbeCacheEntry {
15 pub schema_version: u32,
16 pub fetched_at: u64,
17 pub last_attempt_at: u64,
18 pub last_error: Option<String>,
19 pub result: Option<CursorProbeResult>,
20}
21
22#[derive(Debug, Clone)]
23pub enum CachedCursorProbeOutcome {
24 Hit(CursorProbeResult),
25 Stale(CursorProbeResult),
26 Miss(CursorProbeResult),
27 Unavailable,
28}
29
30impl CachedCursorProbeOutcome {
31 pub fn result(&self) -> Option<&CursorProbeResult> {
32 match self {
33 Self::Hit(r) | Self::Stale(r) | Self::Miss(r) => Some(r),
34 Self::Unavailable => None,
35 }
36 }
37
38 pub fn cache_status(&self) -> &'static str {
39 match self {
40 Self::Hit(_) => "hit",
41 Self::Stale(_) => "stale",
42 Self::Miss(_) => "miss",
43 Self::Unavailable => "skipped",
44 }
45 }
46}
47
48pub fn read_cached_probe_result() -> Option<CursorProbeResult> {
52 let entry = read_cache_tolerant()?;
53 if !is_fresh(&entry) || !is_usable(&entry) {
54 return None;
55 }
56 entry.result
57}
58
59pub fn read_cached_probe_result_usable() -> Option<CursorProbeResult> {
63 let entry = read_cache_tolerant()?;
64 if !is_usable(&entry) {
65 return None;
66 }
67 entry.result
68}
69
70fn cache_dir() -> Result<PathBuf, MarsError> {
71 let root = crate::platform::cache::global_cache_root()?;
72 Ok(root.join("availability"))
73}
74
75fn cache_path() -> Result<PathBuf, MarsError> {
76 Ok(cache_dir()?.join("cursor-probe.json"))
77}
78
79fn lock_path() -> Result<PathBuf, MarsError> {
80 Ok(cache_dir()?.join(".cursor-probe.lock"))
81}
82
83fn ttl_secs() -> u64 {
84 std::env::var("MARS_PROBE_CACHE_TTL_SECS")
85 .ok()
86 .and_then(|v| v.parse::<u64>().ok())
87 .unwrap_or(DEFAULT_TTL_SECS)
88}
89
90fn now_unix_secs() -> u64 {
91 std::time::SystemTime::now()
92 .duration_since(std::time::UNIX_EPOCH)
93 .unwrap_or_default()
94 .as_secs()
95}
96
97fn is_fresh(entry: &ProbeCacheEntry) -> bool {
98 let ttl = ttl_secs();
99 let now = now_unix_secs();
100 if entry.fetched_at > now {
101 return false;
102 }
103 (now - entry.fetched_at) < ttl
104}
105
106fn is_usable(entry: &ProbeCacheEntry) -> bool {
107 entry.result.as_ref().is_some_and(|r| r.model_probe_success)
108}
109
110fn read_cache_tolerant() -> Option<ProbeCacheEntry> {
111 read_cache_tolerant_at(&cache_path().ok()?)
112}
113
114fn read_cache_tolerant_at(path: &Path) -> Option<ProbeCacheEntry> {
115 let content = std::fs::read_to_string(path).ok()?;
116 let entry: ProbeCacheEntry = serde_json::from_str(&content).ok()?;
117 if entry.schema_version != SCHEMA_VERSION {
118 return None;
119 }
120 Some(entry)
121}
122
123fn write_cache(entry: &ProbeCacheEntry) -> Result<(), MarsError> {
124 write_cache_at(&cache_path()?, entry)
125}
126
127fn write_cache_at(path: &Path, entry: &ProbeCacheEntry) -> Result<(), MarsError> {
128 let json = serde_json::to_string_pretty(entry)
129 .map_err(|e| MarsError::Internal(format!("probe cache serialize: {e}")))?;
130 crate::fs::atomic_write(path, json.as_bytes())
131}
132
133struct FileLock {
134 _file: std::fs::File,
135}
136
137fn try_lock() -> Option<FileLock> {
138 lock_at(&lock_path().ok()?, true)
139}
140
141fn blocking_lock() -> Option<FileLock> {
142 lock_at(&lock_path().ok()?, false)
143}
144
145fn lock_at(path: &Path, nonblocking: bool) -> Option<FileLock> {
146 if let Some(parent) = path.parent() {
147 std::fs::create_dir_all(parent).ok()?;
148 }
149 let file = std::fs::OpenOptions::new()
150 .create(true)
151 .write(true)
152 .truncate(false)
153 .open(path)
154 .ok()?;
155
156 #[cfg(unix)]
157 {
158 use std::os::unix::io::AsRawFd;
159 let flags = if nonblocking {
160 libc::LOCK_EX | libc::LOCK_NB
161 } else {
162 libc::LOCK_EX
163 };
164 let ret = unsafe { libc::flock(file.as_raw_fd(), flags) };
165 if ret != 0 {
166 return None;
167 }
168 }
169
170 #[cfg(windows)]
171 {
172 use std::os::windows::io::AsRawHandle;
173 use windows_sys::Win32::Foundation::HANDLE;
174 use windows_sys::Win32::Storage::FileSystem::{
175 LOCKFILE_EXCLUSIVE_LOCK, LOCKFILE_FAIL_IMMEDIATELY, LockFileEx,
176 };
177 let handle = file.as_raw_handle() as HANDLE;
178 let mut overlapped = unsafe { std::mem::zeroed() };
179 let flags = if nonblocking {
180 LOCKFILE_EXCLUSIVE_LOCK | LOCKFILE_FAIL_IMMEDIATELY
181 } else {
182 LOCKFILE_EXCLUSIVE_LOCK
183 };
184 let ret = unsafe { LockFileEx(handle, flags, 0, 1, 0, &mut overlapped) };
185 if ret == 0 {
186 return None;
187 }
188 }
189
190 Some(FileLock { _file: file })
191}
192
193pub fn probe_cached(installed: &HashSet<String>, is_offline: bool) -> CachedCursorProbeOutcome {
194 if !super::should_probe_cursor(installed, is_offline) {
195 return CachedCursorProbeOutcome::Unavailable;
196 }
197
198 probe_cached_impl(is_offline, &cache_path().ok(), super::cursor::probe, || {
199 spawn_detached_refresh().map_err(|_| ())
200 })
201}
202
203fn probe_cached_impl<F, S>(
204 is_offline: bool,
205 path: &Option<PathBuf>,
206 probe: F,
207 spawn_refresh: S,
208) -> CachedCursorProbeOutcome
209where
210 F: Fn() -> CursorProbeResult,
211 S: Fn() -> Result<(), ()>,
212{
213 let cached = path.as_deref().and_then(read_cache_tolerant_at);
214
215 match cached {
216 Some(entry) if is_fresh(&entry) && is_usable(&entry) => {
217 CachedCursorProbeOutcome::Hit(entry.result.unwrap())
218 }
219 Some(entry) if is_usable(&entry) => {
220 let result = entry.result.clone().unwrap();
221 if !is_offline {
222 trigger_background_refresh_with(spawn_refresh);
223 }
224 CachedCursorProbeOutcome::Stale(result)
225 }
226 _ if is_offline => CachedCursorProbeOutcome::Unavailable,
227 _ => synchronous_probe_with(path, probe),
228 }
229}
230
231fn trigger_background_refresh_with<S>(spawn_refresh: S)
232where
233 S: Fn() -> Result<(), ()>,
234{
235 let Some(lock) = try_lock() else { return };
236 if let Some(entry) = read_cache_tolerant()
237 && is_fresh(&entry)
238 && is_usable(&entry)
239 {
240 drop(lock);
241 return;
242 }
243 let _ = spawn_refresh();
244 drop(lock);
245}
246
247fn synchronous_probe_with<F>(path: &Option<PathBuf>, probe: F) -> CachedCursorProbeOutcome
248where
249 F: Fn() -> CursorProbeResult,
250{
251 let lock = blocking_lock();
252
253 if lock.is_some()
254 && let Some(path) = path
255 && let Some(entry) = read_cache_tolerant_at(path)
256 && is_usable(&entry)
257 {
258 if is_fresh(&entry) {
259 return CachedCursorProbeOutcome::Hit(entry.result.unwrap());
260 }
261 let probe_result = probe();
262 if probe_result.model_probe_success {
263 write_probe_attempt(path, probe_result.clone());
264 return CachedCursorProbeOutcome::Miss(probe_result);
265 } else {
266 write_failed_attempt(path, &entry, &probe_result);
267 return CachedCursorProbeOutcome::Stale(entry.result.unwrap());
268 }
269 }
270
271 let probe_result = probe();
272 if let Some(path) = path {
273 write_probe_attempt(path, probe_result.clone());
274 }
275 drop(lock);
276
277 if probe_result.model_probe_success {
278 CachedCursorProbeOutcome::Miss(probe_result)
279 } else {
280 CachedCursorProbeOutcome::Unavailable
281 }
282}
283
284fn write_probe_attempt(path: &Path, probe_result: CursorProbeResult) {
285 let now = now_unix_secs();
286 let entry = ProbeCacheEntry {
287 schema_version: SCHEMA_VERSION,
288 fetched_at: now,
289 last_attempt_at: now,
290 last_error: if probe_result.model_probe_success {
291 None
292 } else {
293 probe_result.error.clone()
294 },
295 result: Some(probe_result),
296 };
297
298 if let Err(e) = write_cache_at(path, &entry) {
299 eprintln!("debug: probe cache write failed: {e}");
300 }
301}
302
303fn write_failed_attempt(path: &Path, existing: &ProbeCacheEntry, failed_probe: &CursorProbeResult) {
304 let now = now_unix_secs();
305 let entry = ProbeCacheEntry {
306 schema_version: SCHEMA_VERSION,
307 fetched_at: existing.fetched_at,
308 last_attempt_at: now,
309 last_error: failed_probe.error.clone(),
310 result: existing.result.clone(),
311 };
312
313 if let Err(e) = write_cache_at(path, &entry) {
314 eprintln!("debug: probe cache write failed: {e}");
315 }
316}
317
318fn spawn_detached_refresh() -> std::io::Result<()> {
319 let mars_bin = std::env::current_exe()?;
320 let mut cmd = std::process::Command::new(mars_bin);
321 cmd.args(["models", "__refresh-probe", "--target", "cursor"]);
322 cmd.stdin(Stdio::null());
323 cmd.stdout(Stdio::null());
324 cmd.stderr(Stdio::null());
325
326 #[cfg(unix)]
327 {
328 use std::os::unix::process::CommandExt;
329 unsafe {
330 cmd.pre_exec(|| {
331 libc::setsid();
332 Ok(())
333 });
334 }
335 }
336
337 #[cfg(windows)]
338 {
339 use std::os::windows::process::CommandExt;
340 cmd.creation_flags(0x00000008);
341 }
342
343 cmd.spawn()?;
344 Ok(())
345}
346
347pub fn run_refresh_probe_command() -> Result<i32, MarsError> {
348 let Some(_lock) = blocking_lock() else {
349 return Ok(0);
350 };
351
352 if let Some(entry) = read_cache_tolerant()
353 && is_fresh(&entry)
354 && is_usable(&entry)
355 {
356 return Ok(0);
357 }
358
359 let probe_result = super::cursor::probe();
360 let now = now_unix_secs();
361 let existing = read_cache_tolerant();
362
363 let entry = if probe_result.model_probe_success {
364 ProbeCacheEntry {
365 schema_version: SCHEMA_VERSION,
366 fetched_at: now,
367 last_attempt_at: now,
368 last_error: None,
369 result: Some(probe_result),
370 }
371 } else {
372 ProbeCacheEntry {
373 schema_version: SCHEMA_VERSION,
374 fetched_at: existing.as_ref().map(|e| e.fetched_at).unwrap_or(0),
375 last_attempt_at: now,
376 last_error: probe_result.error.clone(),
377 result: existing.and_then(|e| e.result),
378 }
379 };
380 let _ = write_cache(&entry);
381
382 Ok(0)
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388 use std::cell::Cell;
389 use tempfile::TempDir;
390
391 fn ok_result() -> CursorProbeResult {
392 CursorProbeResult {
393 slugs: vec!["gpt-5.5-high".to_string()],
394 model_probe_success: true,
395 error: None,
396 }
397 }
398
399 fn fail_result() -> CursorProbeResult {
400 CursorProbeResult {
401 model_probe_success: false,
402 error: Some("boom".to_string()),
403 ..CursorProbeResult::default()
404 }
405 }
406
407 fn entry(fetched_at: u64, result: Option<CursorProbeResult>) -> ProbeCacheEntry {
408 ProbeCacheEntry {
409 schema_version: SCHEMA_VERSION,
410 fetched_at,
411 last_attempt_at: fetched_at,
412 last_error: None,
413 result,
414 }
415 }
416
417 fn cache_file(temp: &TempDir) -> PathBuf {
418 temp.path().join("availability").join("cursor-probe.json")
419 }
420
421 fn write_entry(path: &Path, entry: &ProbeCacheEntry) {
422 write_cache_at(path, entry).unwrap();
423 }
424
425 #[test]
426 fn fresh_hit_returns_cached_result() {
427 let temp = TempDir::new().unwrap();
428 let path = cache_file(&temp);
429 write_entry(&path, &entry(now_unix_secs(), Some(ok_result())));
430
431 let outcome = probe_cached_impl(false, &Some(path), fail_result, || Ok(()));
432 assert!(matches!(outcome, CachedCursorProbeOutcome::Hit(_)));
433 assert_eq!(outcome.result().unwrap().slugs[0], "gpt-5.5-high");
434 }
435
436 #[test]
437 fn stale_entry_returns_stale_outcome() {
438 let temp = TempDir::new().unwrap();
439 let path = cache_file(&temp);
440 write_entry(&path, &entry(1, Some(ok_result())));
441
442 let outcome = probe_cached_impl(false, &Some(path), fail_result, || Ok(()));
443 assert!(matches!(outcome, CachedCursorProbeOutcome::Stale(_)));
444 }
445
446 #[test]
447 fn stale_cache_preserved_on_failed_probe() {
448 let temp = TempDir::new().unwrap();
449 let path = cache_file(&temp);
450 write_entry(&path, &entry(1, Some(ok_result())));
451
452 let outcome = probe_cached_impl(false, &Some(path.clone()), fail_result, || Ok(()));
453
454 assert!(matches!(outcome, CachedCursorProbeOutcome::Stale(_)));
455
456 let on_disk = read_cache_tolerant_at(&path).unwrap();
457 assert!(on_disk.result.as_ref().unwrap().model_probe_success);
458 assert_eq!(on_disk.fetched_at, 1);
459 }
460
461 #[test]
462 fn missing_cache_runs_synchronous_probe() {
463 let temp = TempDir::new().unwrap();
464 let path = cache_file(&temp);
465 let called = Cell::new(false);
466 let outcome = probe_cached_impl(
467 false,
468 &Some(path.clone()),
469 || {
470 called.set(true);
471 ok_result()
472 },
473 || Ok(()),
474 );
475
476 assert!(called.get());
477 assert!(matches!(outcome, CachedCursorProbeOutcome::Miss(_)));
478 assert!(read_cache_tolerant_at(&path).is_some());
479 }
480
481 #[test]
482 fn invalid_json_is_cache_miss() {
483 let temp = TempDir::new().unwrap();
484 let path = cache_file(&temp);
485 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
486 std::fs::write(&path, "not json").unwrap();
487
488 let outcome = probe_cached_impl(false, &Some(path), ok_result, || Ok(()));
489 assert!(matches!(outcome, CachedCursorProbeOutcome::Miss(_)));
490 }
491
492 #[test]
493 fn incompatible_schema_is_cache_miss() {
494 let temp = TempDir::new().unwrap();
495 let path = cache_file(&temp);
496 let mut old = entry(now_unix_secs(), Some(ok_result()));
497 old.schema_version = 999;
498 write_entry(&path, &old);
499
500 let outcome = probe_cached_impl(false, &Some(path), ok_result, || Ok(()));
501 assert!(matches!(outcome, CachedCursorProbeOutcome::Miss(_)));
502 }
503
504 #[test]
505 fn future_fetched_at_is_stale() {
506 let future = entry(now_unix_secs() + 3600, Some(ok_result()));
507 assert!(!is_fresh(&future));
508 }
509
510 #[test]
511 fn ttl_override_controls_freshness() {
512 let _guard = EnvGuard::set("MARS_PROBE_CACHE_TTL_SECS", "9999");
513 let recent = entry(now_unix_secs().saturating_sub(10), Some(ok_result()));
514 assert!(is_fresh(&recent));
515 }
516
517 #[test]
518 fn write_failure_degrades_gracefully() {
519 let temp = TempDir::new().unwrap();
520 let path = temp.path().join("availability");
521 std::fs::write(&path, "file blocks directory").unwrap();
522 let blocked = path.join("cursor-probe.json");
523
524 let outcome = probe_cached_impl(false, &Some(blocked), ok_result, || Ok(()));
525 assert!(matches!(outcome, CachedCursorProbeOutcome::Miss(_)));
526 }
527
528 struct EnvGuard {
529 key: &'static str,
530 prev: Option<std::ffi::OsString>,
531 }
532
533 impl EnvGuard {
534 fn set(key: &'static str, value: &str) -> Self {
535 let prev = std::env::var_os(key);
536 unsafe { std::env::set_var(key, value) };
537 Self { key, prev }
538 }
539 }
540
541 impl Drop for EnvGuard {
542 fn drop(&mut self) {
543 if let Some(prev) = &self.prev {
544 unsafe { std::env::set_var(self.key, prev) };
545 } else {
546 unsafe { std::env::remove_var(self.key) };
547 }
548 }
549 }
550}