sphereql_embed/util.rs
1//! Tiny shared helpers used across multiple modules.
2//!
3//! Kept deliberately small — when in doubt, inline instead of growing
4//! this module. The helpers live here because they'd otherwise duplicate
5//! across `meta_model.rs` and `feedback.rs` (both need timestamps on
6//! persisted records, both default their storage to `~/.sphereql/`, and
7//! both migrate legacy JSON-array stores to JSONL on first append).
8
9use std::fs;
10use std::io;
11use std::io::Read;
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, OnceLock};
14
15use indexmap::IndexMap;
16
17/// Default persisted-record timestamp: seconds since Unix epoch, as a
18/// string. Sortable, unambiguous, and dependency-free. Callers that
19/// want a human-readable format should overwrite the timestamp field
20/// themselves (e.g. via `with_timestamp`).
21pub fn default_timestamp() -> String {
22 match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
23 Ok(d) => d.as_secs().to_string(),
24 Err(_) => "0".to_string(),
25 }
26}
27
28/// Resolve `~/.sphereql/` — the on-disk convention for SphereQL's
29/// persistent training stores (meta_records.json, feedback_events.json).
30///
31/// Returns `$HOME/.sphereql` on Unix, `$USERPROFILE\.sphereql` on
32/// Windows. Returns an error only when neither env var is set — rare,
33/// would mean the process is running without a user profile.
34pub fn sphereql_home_dir() -> io::Result<PathBuf> {
35 let home = std::env::var_os("HOME")
36 .or_else(|| std::env::var_os("USERPROFILE"))
37 .ok_or_else(|| {
38 io::Error::new(
39 io::ErrorKind::NotFound,
40 "neither HOME nor USERPROFILE is set",
41 )
42 })?;
43 Ok(PathBuf::from(home).join(".sphereql"))
44}
45
46/// Hard cap on the number of distinct canonicalized paths we keep
47/// migration locks for. Long-running processes that touch many
48/// stores would otherwise grow the lock map unboundedly. The
49/// per-path lock is only contended during the one-shot legacy-JSON →
50/// JSONL migration, so evicting an idle entry is safe — the next
51/// migration on that path just allocates a fresh lock.
52const MIGRATION_LOCK_CAPACITY: usize = 128;
53
54/// In-process serialization for the legacy-JSON → JSONL migration in
55/// [`migrate_legacy_array_to_jsonl`]. Two threads racing the migration
56/// would otherwise duplicate the migrated bytes (each reads the legacy
57/// array, each writes its own copy back). Keyed per canonical path so
58/// unrelated stores don't contend. Bounded LRU so a long-running daemon
59/// that rotates through many files doesn't leak.
60fn migration_lock(path: &Path) -> Arc<Mutex<()>> {
61 static LOCKS: OnceLock<Mutex<IndexMap<PathBuf, Arc<Mutex<()>>>>> = OnceLock::new();
62 let key = fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf());
63 let map = LOCKS.get_or_init(|| Mutex::new(IndexMap::new()));
64 // Panics only if another thread panicked while holding this lock, which
65 // indicates an unrecoverable process state — re-panicking is correct.
66 let mut guard = map.lock().expect("migration lock map poisoned");
67 if let Some(existing) = guard.shift_remove(&key) {
68 guard.insert(key, existing.clone());
69 return existing;
70 }
71 while guard.len() >= MIGRATION_LOCK_CAPACITY {
72 guard.shift_remove_index(0);
73 }
74 let lock = Arc::new(Mutex::new(()));
75 guard.insert(key, lock.clone());
76 lock
77}
78
79fn first_non_ws_byte(path: &Path) -> io::Result<Option<u8>> {
80 let mut f = fs::File::open(path)?;
81 let mut buf = [0u8; 64];
82 loop {
83 let n = f.read(&mut buf)?;
84 if n == 0 {
85 return Ok(None);
86 }
87 if let Some(&b) = buf[..n].iter().find(|b| !b.is_ascii_whitespace()) {
88 return Ok(Some(b));
89 }
90 }
91}
92
93/// One-time migration of a legacy JSON-array store to JSONL, shared by
94/// the append paths in `meta_model.rs` and `feedback.rs`.
95///
96/// No-op unless `path` exists and its first non-whitespace byte is `[`
97/// — only that byte is needed to disambiguate, so the non-legacy hot
98/// path never reads the whole file. When a legacy array is detected,
99/// `to_jsonl` receives the full file text and must return the JSONL
100/// replacement (one record per line, trailing newline included).
101///
102/// Concurrency-safe: the migration is serialized per canonical path
103/// and the format is re-checked under the lock, so concurrent
104/// appenders don't double-migrate. The rewrite goes through a sibling
105/// temp file + rename, so a crash mid-migration leaves either the
106/// legacy array or the migrated JSONL — never a half-written mix.
107pub fn migrate_legacy_array_to_jsonl(
108 path: &Path,
109 to_jsonl: impl FnOnce(&str) -> io::Result<String>,
110) -> io::Result<()> {
111 if !path.exists() || first_non_ws_byte(path)? != Some(b'[') {
112 return Ok(());
113 }
114 let lock = migration_lock(path);
115 // Same reasoning as in migration_lock — mutex poisoning means
116 // unrecoverable state.
117 let _g = lock.lock().expect("migration lock poisoned");
118 if path.exists() && first_non_ws_byte(path)? == Some(b'[') {
119 let head = fs::read_to_string(path)?;
120 let migrated = to_jsonl(&head)?;
121 let parent = path.parent().unwrap_or_else(|| Path::new("."));
122 let mut tmp = tempfile::NamedTempFile::new_in(parent)?;
123 io::Write::write_all(&mut tmp, migrated.as_bytes())?;
124 tmp.persist(path).map_err(io::Error::other)?;
125 }
126 Ok(())
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn default_timestamp_is_parseable_epoch_seconds() {
135 let ts = default_timestamp();
136 assert!(!ts.is_empty());
137 assert!(ts.parse::<u64>().is_ok());
138 }
139
140 #[test]
141 fn sphereql_home_dir_ends_in_dot_sphereql() {
142 let p = sphereql_home_dir().unwrap();
143 assert_eq!(p.file_name().and_then(|s| s.to_str()), Some(".sphereql"));
144 }
145}