1use crate::config::{Config, ConfigDelta, ConfigError};
8use crate::config_store::{ConfigResolvedPaths, ConfigStore, ConfigStoreMetadata};
9#[cfg(target_arch = "wasm32")]
10use crate::tokio;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15use tokio::io::AsyncWriteExt;
16use tokio::sync::Mutex;
17use uuid::Uuid;
18
19const LOCK_STALE_AFTER: Duration = Duration::from_secs(30);
20const LOCK_RETRY_DELAY: Duration = Duration::from_millis(20);
21const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct ConfigSnapshot {
26 pub config: Config,
27 pub generation: u64,
28 pub metadata: Option<ConfigStoreMetadata>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct ConfigEnvelope {
34 pub config: Config,
35 pub generation: u64,
36 pub realm_id: Option<String>,
37 pub instance_id: Option<String>,
38 pub backend: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub resolved_paths: Option<ConfigResolvedPaths>,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum ConfigEnvelopePolicy {
46 Public,
48 Diagnostic,
50}
51
52impl ConfigEnvelope {
53 pub fn from_snapshot(snapshot: ConfigSnapshot, policy: ConfigEnvelopePolicy) -> Self {
54 let metadata = snapshot.metadata;
55 let resolved_paths = match policy {
56 ConfigEnvelopePolicy::Public => None,
57 ConfigEnvelopePolicy::Diagnostic => {
58 metadata.as_ref().and_then(|m| m.resolved_paths.clone())
59 }
60 };
61 Self {
62 config: snapshot.config,
63 generation: snapshot.generation,
64 realm_id: metadata.as_ref().and_then(|m| m.realm_id.clone()),
65 instance_id: metadata.as_ref().and_then(|m| m.instance_id.clone()),
66 backend: metadata.as_ref().and_then(|m| m.backend.clone()),
67 resolved_paths,
68 }
69 }
70}
71
72impl From<ConfigSnapshot> for ConfigEnvelope {
73 fn from(snapshot: ConfigSnapshot) -> Self {
74 Self::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic)
75 }
76}
77
78#[derive(Debug, thiserror::Error)]
80pub enum ConfigRuntimeError {
81 #[error(transparent)]
82 Config(#[from] ConfigError),
83 #[error("generation conflict: expected {expected}, current {current}")]
84 GenerationConflict { expected: u64, current: u64 },
85 #[error("io error: {0}")]
86 Io(#[from] std::io::Error),
87 #[error("json error: {0}")]
88 Json(#[from] serde_json::Error),
89 #[error("timed out acquiring config lock")]
90 LockTimeout,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94struct RuntimeState {
95 generation: u64,
96}
97
98pub struct ConfigRuntime {
100 store: Arc<dyn ConfigStore>,
101 state_path: PathBuf,
102 lock_path: PathBuf,
103 process_lock: Mutex<()>,
104}
105
106impl ConfigRuntime {
107 pub fn new(store: Arc<dyn ConfigStore>, state_path: PathBuf) -> Self {
109 let lock_path = state_path.with_extension("lock");
110 Self {
111 store,
112 state_path,
113 lock_path,
114 process_lock: Mutex::new(()),
115 }
116 }
117
118 pub fn from_store_metadata(store: Arc<dyn ConfigStore>) -> Option<Self> {
120 let root = store
121 .metadata()
122 .and_then(|m| m.resolved_paths)
123 .map(|p| PathBuf::from(p.root))?;
124 Some(Self::new(store, root.join("config_state.json")))
125 }
126
127 pub async fn get(&self) -> Result<ConfigSnapshot, ConfigRuntimeError> {
129 let _guard = self.process_lock.lock().await;
130 let _file_lock = self.acquire_file_lock().await?;
131 let config = self.store.get().await?;
132 let generation = self.read_generation().await?;
133 Ok(ConfigSnapshot {
134 config,
135 generation,
136 metadata: self.store.metadata(),
137 })
138 }
139
140 pub async fn set(
142 &self,
143 config: Config,
144 expected_generation: Option<u64>,
145 ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
146 let _guard = self.process_lock.lock().await;
147 let _file_lock = self.acquire_file_lock().await?;
148 let current = self.read_generation().await?;
149 if let Some(expected) = expected_generation
150 && expected != current
151 {
152 return Err(ConfigRuntimeError::GenerationConflict { expected, current });
153 }
154
155 self.store.set(config.clone()).await?;
156 let next = current.saturating_add(1);
157 self.write_generation(next).await?;
158
159 Ok(ConfigSnapshot {
160 config,
161 generation: next,
162 metadata: self.store.metadata(),
163 })
164 }
165
166 pub async fn patch(
168 &self,
169 delta: ConfigDelta,
170 expected_generation: Option<u64>,
171 ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
172 let _guard = self.process_lock.lock().await;
173 let _file_lock = self.acquire_file_lock().await?;
174 let current = self.read_generation().await?;
175 if let Some(expected) = expected_generation
176 && expected != current
177 {
178 return Err(ConfigRuntimeError::GenerationConflict { expected, current });
179 }
180
181 let updated = self.store.patch(delta).await?;
182 let next = current.saturating_add(1);
183 self.write_generation(next).await?;
184
185 Ok(ConfigSnapshot {
186 config: updated,
187 generation: next,
188 metadata: self.store.metadata(),
189 })
190 }
191
192 async fn read_generation(&self) -> Result<u64, ConfigRuntimeError> {
193 if !tokio::fs::try_exists(&self.state_path).await? {
194 return Ok(0);
195 }
196 let raw = tokio::fs::read_to_string(&self.state_path).await?;
197 let state: RuntimeState = serde_json::from_str(&raw)?;
198 Ok(state.generation)
199 }
200
201 async fn write_generation(&self, generation: u64) -> Result<(), ConfigRuntimeError> {
202 if let Some(parent) = self.state_path.parent() {
203 tokio::fs::create_dir_all(parent).await?;
204 }
205
206 let state = RuntimeState { generation };
207 let body = serde_json::to_string_pretty(&state)?;
208 let parent = self
209 .state_path
210 .parent()
211 .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
212 let tmp = parent.join(format!(".config_state.tmp.{}", Uuid::now_v7()));
213 let mut file = tokio::fs::OpenOptions::new()
214 .write(true)
215 .create_new(true)
216 .open(&tmp)
217 .await?;
218 file.write_all(body.as_bytes()).await?;
219 file.sync_all().await?;
220 tokio::fs::rename(&tmp, &self.state_path).await?;
221 Ok(())
222 }
223
224 async fn acquire_file_lock(&self) -> Result<LockGuard, ConfigRuntimeError> {
225 LockGuard::acquire(&self.lock_path).await
226 }
227}
228
229struct LockGuard {
230 path: PathBuf,
231}
232
233impl LockGuard {
234 async fn acquire(path: &Path) -> Result<Self, ConfigRuntimeError> {
235 if let Some(parent) = path.parent() {
236 tokio::fs::create_dir_all(parent).await?;
237 }
238
239 let deadline = tokio::time::Instant::now() + LOCK_TIMEOUT;
240 loop {
241 match tokio::fs::OpenOptions::new()
242 .write(true)
243 .create_new(true)
244 .open(path)
245 .await
246 {
247 Ok(mut file) => {
248 file.write_all(b"config-runtime-lock").await?;
249 file.sync_all().await?;
250 return Ok(Self {
251 path: path.to_path_buf(),
252 });
253 }
254 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
255 if LockGuard::is_stale(path).await? {
256 let _ = tokio::fs::remove_file(path).await;
257 continue;
258 }
259 if tokio::time::Instant::now() >= deadline {
260 return Err(ConfigRuntimeError::LockTimeout);
261 }
262 tokio::time::sleep(LOCK_RETRY_DELAY).await;
263 }
264 Err(err) => return Err(ConfigRuntimeError::Io(err)),
265 }
266 }
267 }
268
269 async fn is_stale(path: &Path) -> Result<bool, ConfigRuntimeError> {
270 match tokio::fs::metadata(path).await {
271 Ok(metadata) => {
272 let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
273 let age = SystemTime::now()
274 .duration_since(modified)
275 .unwrap_or_default();
276 Ok(age > LOCK_STALE_AFTER)
277 }
278 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
279 Err(err) => Err(ConfigRuntimeError::Io(err)),
280 }
281 }
282}
283
284impl Drop for LockGuard {
285 fn drop(&mut self) {
286 let _ = std::fs::remove_file(&self.path);
287 }
288}
289
290#[cfg(test)]
291#[allow(clippy::unwrap_used, clippy::expect_used)]
292mod tests {
293 use super::*;
294 use crate::{Config, ConfigDelta, MemoryConfigStore};
295 use serde_json::json;
296
297 #[tokio::test]
298 async fn generation_conflict_is_enforced() {
299 let temp = tempfile::tempdir().unwrap();
300 let store = Arc::new(MemoryConfigStore::new(Config::default()));
301 let runtime = ConfigRuntime::new(store, temp.path().join("state.json"));
302
303 let baseline = runtime.get().await.unwrap();
304 assert_eq!(baseline.generation, 0);
305
306 let mut updated = baseline.config.clone();
307 updated.agent.max_tokens_per_turn = 777;
308 let after = runtime.set(updated, Some(0)).await.unwrap();
309 assert_eq!(after.generation, 1);
310
311 let conflict = runtime
312 .patch(
313 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 1000}})),
314 Some(0),
315 )
316 .await
317 .unwrap_err();
318 assert!(matches!(
319 conflict,
320 ConfigRuntimeError::GenerationConflict {
321 expected: 0,
322 current: 1
323 }
324 ));
325 }
326
327 #[tokio::test]
328 async fn concurrent_writes_with_same_expected_generation_conflict() {
329 let temp = tempfile::tempdir().unwrap();
330 let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::new(Config::default()));
331 let runtime_a = Arc::new(ConfigRuntime::new(
332 Arc::clone(&store),
333 temp.path().join("state.json"),
334 ));
335 let runtime_b = Arc::new(ConfigRuntime::new(
336 Arc::clone(&store),
337 temp.path().join("state.json"),
338 ));
339
340 let task_a = {
341 let runtime = Arc::clone(&runtime_a);
342 tokio::spawn(async move {
343 runtime
344 .patch(
345 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 111}})),
346 Some(0),
347 )
348 .await
349 })
350 };
351 let task_b = {
352 let runtime = Arc::clone(&runtime_b);
353 tokio::spawn(async move {
354 runtime
355 .patch(
356 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 222}})),
357 Some(0),
358 )
359 .await
360 })
361 };
362
363 let res_a = task_a.await.unwrap();
364 let res_b = task_b.await.unwrap();
365
366 let ok_count = usize::from(res_a.is_ok()) + usize::from(res_b.is_ok());
367 let known_failure_count = usize::from(matches!(
368 res_a,
369 Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
370 )) + usize::from(matches!(
371 res_b,
372 Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
373 ));
374 assert!(ok_count <= 1);
375 assert_eq!(known_failure_count + ok_count, 2);
376 }
377
378 #[test]
379 fn config_envelope_policy_controls_resolved_paths_exposure() {
380 let snapshot = ConfigSnapshot {
381 config: Config::default(),
382 generation: 7,
383 metadata: Some(ConfigStoreMetadata {
384 realm_id: Some("team".to_string()),
385 instance_id: Some("instance".to_string()),
386 backend: Some("sqlite".to_string()),
387 resolved_paths: Some(ConfigResolvedPaths {
388 root: "/tmp/root".to_string(),
389 manifest_path: "/tmp/root/realm_manifest.json".to_string(),
390 config_path: "/tmp/root/config.toml".to_string(),
391 sessions_sqlite_path: Some("/tmp/root/sessions.sqlite3".to_string()),
392 sessions_jsonl_dir: "/tmp/root/sessions_jsonl".to_string(),
393 }),
394 }),
395 };
396
397 let public = ConfigEnvelope::from_snapshot(snapshot.clone(), ConfigEnvelopePolicy::Public);
398 assert!(public.resolved_paths.is_none());
399
400 let diagnostic = ConfigEnvelope::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic);
401 assert!(diagnostic.resolved_paths.is_some());
402 }
403}