Skip to main content

meerkat_core/
config_runtime.rs

1//! Realm-scoped config runtime with generation CAS semantics.
2//!
3//! This wraps a [`ConfigStore`] and maintains a monotonic generation counter in
4//! a sidecar state file. Writes support expected-generation checks so clients
5//! can do optimistic concurrency control across surfaces.
6
7use crate::config::{Config, ConfigDelta, ConfigError};
8use crate::config_store::{ConfigResolvedPaths, ConfigStore, ConfigStoreMetadata};
9#[cfg(target_arch = "wasm32")]
10use crate::tokio;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15use tokio::io::AsyncWriteExt;
16use tokio::sync::Mutex;
17use uuid::Uuid;
18
19const LOCK_STALE_AFTER: Duration = Duration::from_secs(30);
20const LOCK_RETRY_DELAY: Duration = Duration::from_millis(20);
21const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
22
23/// Snapshot returned by config runtime operations.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct ConfigSnapshot {
26    pub config: Config,
27    pub generation: u64,
28    pub metadata: Option<ConfigStoreMetadata>,
29}
30
31/// Wire envelope returned by config APIs across surfaces.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ConfigEnvelope {
34    pub config: Config,
35    pub generation: u64,
36    pub realm_id: Option<String>,
37    pub instance_id: Option<String>,
38    pub backend: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub resolved_paths: Option<ConfigResolvedPaths>,
41}
42
43/// Policy for exposing diagnostic filesystem paths in config envelopes.
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ConfigEnvelopePolicy {
46    /// Public shape: omit resolved filesystem paths.
47    Public,
48    /// Diagnostic shape: include resolved filesystem paths when available.
49    Diagnostic,
50}
51
52impl ConfigEnvelope {
53    pub fn from_snapshot(snapshot: ConfigSnapshot, policy: ConfigEnvelopePolicy) -> Self {
54        let metadata = snapshot.metadata;
55        let resolved_paths = match policy {
56            ConfigEnvelopePolicy::Public => None,
57            ConfigEnvelopePolicy::Diagnostic => {
58                metadata.as_ref().and_then(|m| m.resolved_paths.clone())
59            }
60        };
61        Self {
62            config: snapshot.config,
63            generation: snapshot.generation,
64            realm_id: metadata.as_ref().and_then(|m| m.realm_id.clone()),
65            instance_id: metadata.as_ref().and_then(|m| m.instance_id.clone()),
66            backend: metadata.as_ref().and_then(|m| m.backend.clone()),
67            resolved_paths,
68        }
69    }
70}
71
72impl From<ConfigSnapshot> for ConfigEnvelope {
73    fn from(snapshot: ConfigSnapshot) -> Self {
74        Self::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic)
75    }
76}
77
78/// Errors returned by [`ConfigRuntime`].
79#[derive(Debug, thiserror::Error)]
80pub enum ConfigRuntimeError {
81    #[error(transparent)]
82    Config(#[from] ConfigError),
83    #[error("generation conflict: expected {expected}, current {current}")]
84    GenerationConflict { expected: u64, current: u64 },
85    #[error("io error: {0}")]
86    Io(#[from] std::io::Error),
87    #[error("json error: {0}")]
88    Json(#[from] serde_json::Error),
89    #[error("timed out acquiring config lock")]
90    LockTimeout,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94struct RuntimeState {
95    generation: u64,
96}
97
98/// File-backed config runtime with monotonic generation CAS.
99pub struct ConfigRuntime {
100    store: Arc<dyn ConfigStore>,
101    state_path: PathBuf,
102    lock_path: PathBuf,
103    process_lock: Mutex<()>,
104}
105
106impl ConfigRuntime {
107    /// Create a config runtime using an explicit state file path.
108    pub fn new(store: Arc<dyn ConfigStore>, state_path: PathBuf) -> Self {
109        let lock_path = state_path.with_extension("lock");
110        Self {
111            store,
112            state_path,
113            lock_path,
114            process_lock: Mutex::new(()),
115        }
116    }
117
118    /// Construct from store metadata when a resolved realm root is available.
119    pub fn from_store_metadata(store: Arc<dyn ConfigStore>) -> Option<Self> {
120        let root = store
121            .metadata()
122            .and_then(|m| m.resolved_paths)
123            .map(|p| PathBuf::from(p.root))?;
124        Some(Self::new(store, root.join("config_state.json")))
125    }
126
127    /// Read current config + generation.
128    pub async fn get(&self) -> Result<ConfigSnapshot, ConfigRuntimeError> {
129        let _guard = self.process_lock.lock().await;
130        let _file_lock = self.acquire_file_lock().await?;
131        let config = self.store.get().await?;
132        let generation = self.read_generation().await?;
133        Ok(ConfigSnapshot {
134            config,
135            generation,
136            metadata: self.store.metadata(),
137        })
138    }
139
140    /// Replace config with optional generation check.
141    pub async fn set(
142        &self,
143        config: Config,
144        expected_generation: Option<u64>,
145    ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
146        let _guard = self.process_lock.lock().await;
147        let _file_lock = self.acquire_file_lock().await?;
148        let current = self.read_generation().await?;
149        if let Some(expected) = expected_generation
150            && expected != current
151        {
152            return Err(ConfigRuntimeError::GenerationConflict { expected, current });
153        }
154
155        self.store.set(config.clone()).await?;
156        let next = current.saturating_add(1);
157        self.write_generation(next).await?;
158
159        Ok(ConfigSnapshot {
160            config,
161            generation: next,
162            metadata: self.store.metadata(),
163        })
164    }
165
166    /// Apply JSON merge patch with optional generation check.
167    pub async fn patch(
168        &self,
169        delta: ConfigDelta,
170        expected_generation: Option<u64>,
171    ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
172        let _guard = self.process_lock.lock().await;
173        let _file_lock = self.acquire_file_lock().await?;
174        let current = self.read_generation().await?;
175        if let Some(expected) = expected_generation
176            && expected != current
177        {
178            return Err(ConfigRuntimeError::GenerationConflict { expected, current });
179        }
180
181        let updated = self.store.patch(delta).await?;
182        let next = current.saturating_add(1);
183        self.write_generation(next).await?;
184
185        Ok(ConfigSnapshot {
186            config: updated,
187            generation: next,
188            metadata: self.store.metadata(),
189        })
190    }
191
192    async fn read_generation(&self) -> Result<u64, ConfigRuntimeError> {
193        if !tokio::fs::try_exists(&self.state_path).await? {
194            return Ok(0);
195        }
196        let raw = tokio::fs::read_to_string(&self.state_path).await?;
197        let state: RuntimeState = serde_json::from_str(&raw)?;
198        Ok(state.generation)
199    }
200
201    async fn write_generation(&self, generation: u64) -> Result<(), ConfigRuntimeError> {
202        if let Some(parent) = self.state_path.parent() {
203            tokio::fs::create_dir_all(parent).await?;
204        }
205
206        let state = RuntimeState { generation };
207        let body = serde_json::to_string_pretty(&state)?;
208        let parent = self
209            .state_path
210            .parent()
211            .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
212        let tmp = parent.join(format!(".config_state.tmp.{}", Uuid::now_v7()));
213        let mut file = tokio::fs::OpenOptions::new()
214            .write(true)
215            .create_new(true)
216            .open(&tmp)
217            .await?;
218        file.write_all(body.as_bytes()).await?;
219        file.sync_all().await?;
220        tokio::fs::rename(&tmp, &self.state_path).await?;
221        Ok(())
222    }
223
224    async fn acquire_file_lock(&self) -> Result<LockGuard, ConfigRuntimeError> {
225        LockGuard::acquire(&self.lock_path).await
226    }
227}
228
229struct LockGuard {
230    path: PathBuf,
231}
232
233impl LockGuard {
234    async fn acquire(path: &Path) -> Result<Self, ConfigRuntimeError> {
235        if let Some(parent) = path.parent() {
236            tokio::fs::create_dir_all(parent).await?;
237        }
238
239        let deadline = tokio::time::Instant::now() + LOCK_TIMEOUT;
240        loop {
241            match tokio::fs::OpenOptions::new()
242                .write(true)
243                .create_new(true)
244                .open(path)
245                .await
246            {
247                Ok(mut file) => {
248                    file.write_all(b"config-runtime-lock").await?;
249                    file.sync_all().await?;
250                    return Ok(Self {
251                        path: path.to_path_buf(),
252                    });
253                }
254                Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
255                    if LockGuard::is_stale(path).await? {
256                        let _ = tokio::fs::remove_file(path).await;
257                        continue;
258                    }
259                    if tokio::time::Instant::now() >= deadline {
260                        return Err(ConfigRuntimeError::LockTimeout);
261                    }
262                    tokio::time::sleep(LOCK_RETRY_DELAY).await;
263                }
264                Err(err) => return Err(ConfigRuntimeError::Io(err)),
265            }
266        }
267    }
268
269    async fn is_stale(path: &Path) -> Result<bool, ConfigRuntimeError> {
270        match tokio::fs::metadata(path).await {
271            Ok(metadata) => {
272                let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
273                let age = SystemTime::now()
274                    .duration_since(modified)
275                    .unwrap_or_default();
276                Ok(age > LOCK_STALE_AFTER)
277            }
278            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
279            Err(err) => Err(ConfigRuntimeError::Io(err)),
280        }
281    }
282}
283
284impl Drop for LockGuard {
285    fn drop(&mut self) {
286        let _ = std::fs::remove_file(&self.path);
287    }
288}
289
290#[cfg(test)]
291#[allow(clippy::unwrap_used, clippy::expect_used)]
292mod tests {
293    use super::*;
294    use crate::{Config, ConfigDelta, MemoryConfigStore};
295    use serde_json::json;
296
297    #[tokio::test]
298    async fn generation_conflict_is_enforced() {
299        let temp = tempfile::tempdir().unwrap();
300        let store = Arc::new(MemoryConfigStore::new(Config::default()));
301        let runtime = ConfigRuntime::new(store, temp.path().join("state.json"));
302
303        let baseline = runtime.get().await.unwrap();
304        assert_eq!(baseline.generation, 0);
305
306        let mut updated = baseline.config.clone();
307        updated.agent.max_tokens_per_turn = 777;
308        let after = runtime.set(updated, Some(0)).await.unwrap();
309        assert_eq!(after.generation, 1);
310
311        let conflict = runtime
312            .patch(
313                ConfigDelta(json!({"agent": {"max_tokens_per_turn": 1000}})),
314                Some(0),
315            )
316            .await
317            .unwrap_err();
318        assert!(matches!(
319            conflict,
320            ConfigRuntimeError::GenerationConflict {
321                expected: 0,
322                current: 1
323            }
324        ));
325    }
326
327    #[tokio::test]
328    async fn concurrent_writes_with_same_expected_generation_conflict() {
329        let temp = tempfile::tempdir().unwrap();
330        let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::new(Config::default()));
331        let runtime_a = Arc::new(ConfigRuntime::new(
332            Arc::clone(&store),
333            temp.path().join("state.json"),
334        ));
335        let runtime_b = Arc::new(ConfigRuntime::new(
336            Arc::clone(&store),
337            temp.path().join("state.json"),
338        ));
339
340        let task_a = {
341            let runtime = Arc::clone(&runtime_a);
342            tokio::spawn(async move {
343                runtime
344                    .patch(
345                        ConfigDelta(json!({"agent": {"max_tokens_per_turn": 111}})),
346                        Some(0),
347                    )
348                    .await
349            })
350        };
351        let task_b = {
352            let runtime = Arc::clone(&runtime_b);
353            tokio::spawn(async move {
354                runtime
355                    .patch(
356                        ConfigDelta(json!({"agent": {"max_tokens_per_turn": 222}})),
357                        Some(0),
358                    )
359                    .await
360            })
361        };
362
363        let res_a = task_a.await.unwrap();
364        let res_b = task_b.await.unwrap();
365
366        let ok_count = usize::from(res_a.is_ok()) + usize::from(res_b.is_ok());
367        let known_failure_count = usize::from(matches!(
368            res_a,
369            Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
370        )) + usize::from(matches!(
371            res_b,
372            Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
373        ));
374        assert!(ok_count <= 1);
375        assert_eq!(known_failure_count + ok_count, 2);
376    }
377
378    #[test]
379    fn config_envelope_policy_controls_resolved_paths_exposure() {
380        let snapshot = ConfigSnapshot {
381            config: Config::default(),
382            generation: 7,
383            metadata: Some(ConfigStoreMetadata {
384                realm_id: Some("team".to_string()),
385                instance_id: Some("instance".to_string()),
386                backend: Some("sqlite".to_string()),
387                resolved_paths: Some(ConfigResolvedPaths {
388                    root: "/tmp/root".to_string(),
389                    manifest_path: "/tmp/root/realm_manifest.json".to_string(),
390                    config_path: "/tmp/root/config.toml".to_string(),
391                    sessions_sqlite_path: Some("/tmp/root/sessions.sqlite3".to_string()),
392                    sessions_jsonl_dir: "/tmp/root/sessions_jsonl".to_string(),
393                }),
394            }),
395        };
396
397        let public = ConfigEnvelope::from_snapshot(snapshot.clone(), ConfigEnvelopePolicy::Public);
398        assert!(public.resolved_paths.is_none());
399
400        let diagnostic = ConfigEnvelope::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic);
401        assert!(diagnostic.resolved_paths.is_some());
402    }
403}