Skip to main content

meerkat_core/
config_runtime.rs

1//! Realm-scoped config runtime with generation CAS semantics.
2//!
3//! This wraps a [`ConfigStore`] and maintains a monotonic generation counter in
4//! a sidecar state file. Writes support expected-generation checks so clients
5//! can do optimistic concurrency control across surfaces.
6
7use crate::config::{Config, ConfigDelta, ConfigError};
8use crate::config_store::{ConfigResolvedPaths, ConfigStore, ConfigStoreMetadata};
9#[cfg(target_arch = "wasm32")]
10use crate::tokio;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15use tokio::io::AsyncWriteExt;
16use tokio::sync::Mutex;
17
18const LOCK_STALE_AFTER: Duration = Duration::from_secs(30);
19const LOCK_RETRY_DELAY: Duration = Duration::from_millis(20);
20const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
21
22/// Snapshot returned by config runtime operations.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConfigSnapshot {
25    pub config: Config,
26    pub generation: u64,
27    pub metadata: Option<ConfigStoreMetadata>,
28}
29
30/// Wire envelope returned by config APIs across surfaces.
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ConfigEnvelope {
33    pub config: Config,
34    pub generation: u64,
35    pub realm_id: Option<String>,
36    pub instance_id: Option<String>,
37    pub backend: Option<String>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub resolved_paths: Option<ConfigResolvedPaths>,
40}
41
42/// Policy for exposing diagnostic filesystem paths in config envelopes.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ConfigEnvelopePolicy {
45    /// Public shape: omit resolved filesystem paths.
46    Public,
47    /// Diagnostic shape: include resolved filesystem paths when available.
48    Diagnostic,
49}
50
51impl ConfigEnvelope {
52    pub fn from_snapshot(snapshot: ConfigSnapshot, policy: ConfigEnvelopePolicy) -> Self {
53        let metadata = snapshot.metadata;
54        let resolved_paths = match policy {
55            ConfigEnvelopePolicy::Public => None,
56            ConfigEnvelopePolicy::Diagnostic => {
57                metadata.as_ref().and_then(|m| m.resolved_paths.clone())
58            }
59        };
60        Self {
61            config: snapshot.config,
62            generation: snapshot.generation,
63            realm_id: metadata.as_ref().and_then(|m| m.realm_id.clone()),
64            instance_id: metadata.as_ref().and_then(|m| m.instance_id.clone()),
65            backend: metadata.as_ref().and_then(|m| m.backend.clone()),
66            resolved_paths,
67        }
68    }
69}
70
71impl From<ConfigSnapshot> for ConfigEnvelope {
72    fn from(snapshot: ConfigSnapshot) -> Self {
73        Self::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic)
74    }
75}
76
77/// Errors returned by [`ConfigRuntime`].
78#[derive(Debug, thiserror::Error)]
79pub enum ConfigRuntimeError {
80    #[error(transparent)]
81    Config(#[from] ConfigError),
82    #[error("generation conflict: expected {expected}, current {current}")]
83    GenerationConflict { expected: u64, current: u64 },
84    #[error("io error: {0}")]
85    Io(#[from] std::io::Error),
86    #[error("json error: {0}")]
87    Json(#[from] serde_json::Error),
88    #[error("timed out acquiring config lock")]
89    LockTimeout,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93struct RuntimeState {
94    generation: u64,
95}
96
97/// File-backed config runtime with monotonic generation CAS.
98pub struct ConfigRuntime {
99    store: Arc<dyn ConfigStore>,
100    state_path: PathBuf,
101    lock_path: PathBuf,
102    process_lock: Mutex<()>,
103}
104
105impl ConfigRuntime {
106    /// Create a config runtime using an explicit state file path.
107    pub fn new(store: Arc<dyn ConfigStore>, state_path: PathBuf) -> Self {
108        let lock_path = state_path.with_extension("lock");
109        Self {
110            store,
111            state_path,
112            lock_path,
113            process_lock: Mutex::new(()),
114        }
115    }
116
117    /// Construct from store metadata when a resolved realm root is available.
118    pub fn from_store_metadata(store: Arc<dyn ConfigStore>) -> Option<Self> {
119        let root = store
120            .metadata()
121            .and_then(|m| m.resolved_paths)
122            .map(|p| PathBuf::from(p.root))?;
123        Some(Self::new(store, root.join("config_state.json")))
124    }
125
126    /// Read current config + generation.
127    pub async fn get(&self) -> Result<ConfigSnapshot, ConfigRuntimeError> {
128        let _guard = self.process_lock.lock().await;
129        let _file_lock = self.acquire_file_lock().await?;
130        let config = self.store.get().await?;
131        let generation = self.read_generation().await?;
132        Ok(ConfigSnapshot {
133            config,
134            generation,
135            metadata: self.store.metadata(),
136        })
137    }
138
139    /// Replace config with optional generation check.
140    pub async fn set(
141        &self,
142        config: Config,
143        expected_generation: Option<u64>,
144    ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
145        let _guard = self.process_lock.lock().await;
146        let _file_lock = self.acquire_file_lock().await?;
147        let current = self.read_generation().await?;
148        if let Some(expected) = expected_generation
149            && expected != current
150        {
151            return Err(ConfigRuntimeError::GenerationConflict { expected, current });
152        }
153
154        self.store.set(config.clone()).await?;
155        let next = current.saturating_add(1);
156        self.write_generation(next).await?;
157
158        Ok(ConfigSnapshot {
159            config,
160            generation: next,
161            metadata: self.store.metadata(),
162        })
163    }
164
165    /// Apply JSON merge patch with optional generation check.
166    pub async fn patch(
167        &self,
168        delta: ConfigDelta,
169        expected_generation: Option<u64>,
170    ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
171        let _guard = self.process_lock.lock().await;
172        let _file_lock = self.acquire_file_lock().await?;
173        let current = self.read_generation().await?;
174        if let Some(expected) = expected_generation
175            && expected != current
176        {
177            return Err(ConfigRuntimeError::GenerationConflict { expected, current });
178        }
179
180        let updated = self.store.patch(delta).await?;
181        let next = current.saturating_add(1);
182        self.write_generation(next).await?;
183
184        Ok(ConfigSnapshot {
185            config: updated,
186            generation: next,
187            metadata: self.store.metadata(),
188        })
189    }
190
191    async fn read_generation(&self) -> Result<u64, ConfigRuntimeError> {
192        if !tokio::fs::try_exists(&self.state_path).await? {
193            return Ok(0);
194        }
195        let raw = tokio::fs::read_to_string(&self.state_path).await?;
196        let state: RuntimeState = serde_json::from_str(&raw)?;
197        Ok(state.generation)
198    }
199
200    async fn write_generation(&self, generation: u64) -> Result<(), ConfigRuntimeError> {
201        if let Some(parent) = self.state_path.parent() {
202            tokio::fs::create_dir_all(parent).await?;
203        }
204
205        let state = RuntimeState { generation };
206        let body = serde_json::to_string_pretty(&state)?;
207        let parent = self
208            .state_path
209            .parent()
210            .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
211        let tmp = parent.join(format!(
212            ".config_state.tmp.{}",
213            crate::time_compat::new_uuid_v7()
214        ));
215        let mut file = tokio::fs::OpenOptions::new()
216            .write(true)
217            .create_new(true)
218            .open(&tmp)
219            .await?;
220        file.write_all(body.as_bytes()).await?;
221        file.sync_all().await?;
222        tokio::fs::rename(&tmp, &self.state_path).await?;
223        Ok(())
224    }
225
226    async fn acquire_file_lock(&self) -> Result<LockGuard, ConfigRuntimeError> {
227        LockGuard::acquire(&self.lock_path).await
228    }
229}
230
231struct LockGuard {
232    path: PathBuf,
233}
234
235impl LockGuard {
236    async fn acquire(path: &Path) -> Result<Self, ConfigRuntimeError> {
237        if let Some(parent) = path.parent() {
238            tokio::fs::create_dir_all(parent).await?;
239        }
240
241        let deadline = tokio::time::Instant::now() + LOCK_TIMEOUT;
242        loop {
243            match tokio::fs::OpenOptions::new()
244                .write(true)
245                .create_new(true)
246                .open(path)
247                .await
248            {
249                Ok(mut file) => {
250                    file.write_all(b"config-runtime-lock").await?;
251                    file.sync_all().await?;
252                    return Ok(Self {
253                        path: path.to_path_buf(),
254                    });
255                }
256                Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
257                    if LockGuard::is_stale(path).await? {
258                        let _ = tokio::fs::remove_file(path).await;
259                        continue;
260                    }
261                    if tokio::time::Instant::now() >= deadline {
262                        return Err(ConfigRuntimeError::LockTimeout);
263                    }
264                    tokio::time::sleep(LOCK_RETRY_DELAY).await;
265                }
266                Err(err) => return Err(ConfigRuntimeError::Io(err)),
267            }
268        }
269    }
270
271    async fn is_stale(path: &Path) -> Result<bool, ConfigRuntimeError> {
272        match tokio::fs::metadata(path).await {
273            Ok(metadata) => {
274                let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
275                let age = SystemTime::now()
276                    .duration_since(modified)
277                    .unwrap_or_default();
278                Ok(age > LOCK_STALE_AFTER)
279            }
280            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
281            Err(err) => Err(ConfigRuntimeError::Io(err)),
282        }
283    }
284}
285
286impl Drop for LockGuard {
287    fn drop(&mut self) {
288        let _ = std::fs::remove_file(&self.path);
289    }
290}
291
292#[cfg(test)]
293#[allow(clippy::unwrap_used, clippy::expect_used)]
294mod tests {
295    use super::*;
296    use crate::{Config, ConfigDelta, MemoryConfigStore};
297    use serde_json::json;
298
299    #[tokio::test]
300    async fn generation_conflict_is_enforced() {
301        let temp = tempfile::tempdir().unwrap();
302        let store = Arc::new(MemoryConfigStore::new(Config::default()));
303        let runtime = ConfigRuntime::new(store, temp.path().join("state.json"));
304
305        let baseline = runtime.get().await.unwrap();
306        assert_eq!(baseline.generation, 0);
307
308        let mut updated = baseline.config.clone();
309        updated.agent.max_tokens_per_turn = 777;
310        let after = runtime.set(updated, Some(0)).await.unwrap();
311        assert_eq!(after.generation, 1);
312
313        let conflict = runtime
314            .patch(
315                ConfigDelta(json!({"agent": {"max_tokens_per_turn": 1000}})),
316                Some(0),
317            )
318            .await
319            .unwrap_err();
320        assert!(matches!(
321            conflict,
322            ConfigRuntimeError::GenerationConflict {
323                expected: 0,
324                current: 1
325            }
326        ));
327    }
328
329    #[tokio::test]
330    async fn concurrent_writes_with_same_expected_generation_conflict() {
331        let temp = tempfile::tempdir().unwrap();
332        let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::new(Config::default()));
333        let runtime_a = Arc::new(ConfigRuntime::new(
334            Arc::clone(&store),
335            temp.path().join("state.json"),
336        ));
337        let runtime_b = Arc::new(ConfigRuntime::new(
338            Arc::clone(&store),
339            temp.path().join("state.json"),
340        ));
341
342        let task_a = {
343            let runtime = Arc::clone(&runtime_a);
344            tokio::spawn(async move {
345                runtime
346                    .patch(
347                        ConfigDelta(json!({"agent": {"max_tokens_per_turn": 111}})),
348                        Some(0),
349                    )
350                    .await
351            })
352        };
353        let task_b = {
354            let runtime = Arc::clone(&runtime_b);
355            tokio::spawn(async move {
356                runtime
357                    .patch(
358                        ConfigDelta(json!({"agent": {"max_tokens_per_turn": 222}})),
359                        Some(0),
360                    )
361                    .await
362            })
363        };
364
365        let res_a = task_a.await.unwrap();
366        let res_b = task_b.await.unwrap();
367
368        let ok_count = usize::from(res_a.is_ok()) + usize::from(res_b.is_ok());
369        let known_failure_count = usize::from(matches!(
370            res_a,
371            Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
372        )) + usize::from(matches!(
373            res_b,
374            Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
375        ));
376        assert!(ok_count <= 1);
377        assert_eq!(known_failure_count + ok_count, 2);
378    }
379
380    #[test]
381    fn config_envelope_policy_controls_resolved_paths_exposure() {
382        let snapshot = ConfigSnapshot {
383            config: Config::default(),
384            generation: 7,
385            metadata: Some(ConfigStoreMetadata {
386                realm_id: Some("team".to_string()),
387                instance_id: Some("instance".to_string()),
388                backend: Some("sqlite".to_string()),
389                resolved_paths: Some(ConfigResolvedPaths {
390                    root: "/tmp/root".to_string(),
391                    manifest_path: "/tmp/root/realm_manifest.json".to_string(),
392                    config_path: "/tmp/root/config.toml".to_string(),
393                    sessions_sqlite_path: Some("/tmp/root/sessions.sqlite3".to_string()),
394                    sessions_jsonl_dir: "/tmp/root/sessions_jsonl".to_string(),
395                }),
396            }),
397        };
398
399        let public = ConfigEnvelope::from_snapshot(snapshot.clone(), ConfigEnvelopePolicy::Public);
400        assert!(public.resolved_paths.is_none());
401
402        let diagnostic = ConfigEnvelope::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic);
403        assert!(diagnostic.resolved_paths.is_some());
404    }
405}