1use crate::config::{Config, ConfigDelta, ConfigError};
8use crate::config_store::{ConfigResolvedPaths, ConfigStore, ConfigStoreMetadata};
9#[cfg(target_arch = "wasm32")]
10use crate::tokio;
11use serde::{Deserialize, Serialize};
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15use tokio::io::AsyncWriteExt;
16use tokio::sync::Mutex;
17
18const LOCK_STALE_AFTER: Duration = Duration::from_secs(30);
19const LOCK_RETRY_DELAY: Duration = Duration::from_millis(20);
20const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConfigSnapshot {
25 pub config: Config,
26 pub generation: u64,
27 pub metadata: Option<ConfigStoreMetadata>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct ConfigEnvelope {
33 pub config: Config,
34 pub generation: u64,
35 pub realm_id: Option<String>,
36 pub instance_id: Option<String>,
37 pub backend: Option<String>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub resolved_paths: Option<ConfigResolvedPaths>,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum ConfigEnvelopePolicy {
45 Public,
47 Diagnostic,
49}
50
51impl ConfigEnvelope {
52 pub fn from_snapshot(snapshot: ConfigSnapshot, policy: ConfigEnvelopePolicy) -> Self {
53 let metadata = snapshot.metadata;
54 let resolved_paths = match policy {
55 ConfigEnvelopePolicy::Public => None,
56 ConfigEnvelopePolicy::Diagnostic => {
57 metadata.as_ref().and_then(|m| m.resolved_paths.clone())
58 }
59 };
60 Self {
61 config: snapshot.config,
62 generation: snapshot.generation,
63 realm_id: metadata.as_ref().and_then(|m| m.realm_id.clone()),
64 instance_id: metadata.as_ref().and_then(|m| m.instance_id.clone()),
65 backend: metadata.as_ref().and_then(|m| m.backend.clone()),
66 resolved_paths,
67 }
68 }
69}
70
71impl From<ConfigSnapshot> for ConfigEnvelope {
72 fn from(snapshot: ConfigSnapshot) -> Self {
73 Self::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic)
74 }
75}
76
77#[derive(Debug, thiserror::Error)]
79pub enum ConfigRuntimeError {
80 #[error(transparent)]
81 Config(#[from] ConfigError),
82 #[error("generation conflict: expected {expected}, current {current}")]
83 GenerationConflict { expected: u64, current: u64 },
84 #[error("io error: {0}")]
85 Io(#[from] std::io::Error),
86 #[error("json error: {0}")]
87 Json(#[from] serde_json::Error),
88 #[error("timed out acquiring config lock")]
89 LockTimeout,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93struct RuntimeState {
94 generation: u64,
95}
96
97pub struct ConfigRuntime {
99 store: Arc<dyn ConfigStore>,
100 state_path: PathBuf,
101 lock_path: PathBuf,
102 process_lock: Mutex<()>,
103}
104
105impl ConfigRuntime {
106 pub fn new(store: Arc<dyn ConfigStore>, state_path: PathBuf) -> Self {
108 let lock_path = state_path.with_extension("lock");
109 Self {
110 store,
111 state_path,
112 lock_path,
113 process_lock: Mutex::new(()),
114 }
115 }
116
117 pub fn from_store_metadata(store: Arc<dyn ConfigStore>) -> Option<Self> {
119 let root = store
120 .metadata()
121 .and_then(|m| m.resolved_paths)
122 .map(|p| PathBuf::from(p.root))?;
123 Some(Self::new(store, root.join("config_state.json")))
124 }
125
126 pub async fn get(&self) -> Result<ConfigSnapshot, ConfigRuntimeError> {
128 let _guard = self.process_lock.lock().await;
129 let _file_lock = self.acquire_file_lock().await?;
130 let config = self.store.get().await?;
131 let generation = self.read_generation().await?;
132 Ok(ConfigSnapshot {
133 config,
134 generation,
135 metadata: self.store.metadata(),
136 })
137 }
138
139 pub async fn set(
141 &self,
142 config: Config,
143 expected_generation: Option<u64>,
144 ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
145 let _guard = self.process_lock.lock().await;
146 let _file_lock = self.acquire_file_lock().await?;
147 let current = self.read_generation().await?;
148 if let Some(expected) = expected_generation
149 && expected != current
150 {
151 return Err(ConfigRuntimeError::GenerationConflict { expected, current });
152 }
153
154 self.store.set(config.clone()).await?;
155 let next = current.saturating_add(1);
156 self.write_generation(next).await?;
157
158 Ok(ConfigSnapshot {
159 config,
160 generation: next,
161 metadata: self.store.metadata(),
162 })
163 }
164
165 pub async fn patch(
167 &self,
168 delta: ConfigDelta,
169 expected_generation: Option<u64>,
170 ) -> Result<ConfigSnapshot, ConfigRuntimeError> {
171 let _guard = self.process_lock.lock().await;
172 let _file_lock = self.acquire_file_lock().await?;
173 let current = self.read_generation().await?;
174 if let Some(expected) = expected_generation
175 && expected != current
176 {
177 return Err(ConfigRuntimeError::GenerationConflict { expected, current });
178 }
179
180 let updated = self.store.patch(delta).await?;
181 let next = current.saturating_add(1);
182 self.write_generation(next).await?;
183
184 Ok(ConfigSnapshot {
185 config: updated,
186 generation: next,
187 metadata: self.store.metadata(),
188 })
189 }
190
191 async fn read_generation(&self) -> Result<u64, ConfigRuntimeError> {
192 if !tokio::fs::try_exists(&self.state_path).await? {
193 return Ok(0);
194 }
195 let raw = tokio::fs::read_to_string(&self.state_path).await?;
196 let state: RuntimeState = serde_json::from_str(&raw)?;
197 Ok(state.generation)
198 }
199
200 async fn write_generation(&self, generation: u64) -> Result<(), ConfigRuntimeError> {
201 if let Some(parent) = self.state_path.parent() {
202 tokio::fs::create_dir_all(parent).await?;
203 }
204
205 let state = RuntimeState { generation };
206 let body = serde_json::to_string_pretty(&state)?;
207 let parent = self
208 .state_path
209 .parent()
210 .map_or_else(|| PathBuf::from("."), Path::to_path_buf);
211 let tmp = parent.join(format!(
212 ".config_state.tmp.{}",
213 crate::time_compat::new_uuid_v7()
214 ));
215 let mut file = tokio::fs::OpenOptions::new()
216 .write(true)
217 .create_new(true)
218 .open(&tmp)
219 .await?;
220 file.write_all(body.as_bytes()).await?;
221 file.sync_all().await?;
222 tokio::fs::rename(&tmp, &self.state_path).await?;
223 Ok(())
224 }
225
226 async fn acquire_file_lock(&self) -> Result<LockGuard, ConfigRuntimeError> {
227 LockGuard::acquire(&self.lock_path).await
228 }
229}
230
231struct LockGuard {
232 path: PathBuf,
233}
234
235impl LockGuard {
236 async fn acquire(path: &Path) -> Result<Self, ConfigRuntimeError> {
237 if let Some(parent) = path.parent() {
238 tokio::fs::create_dir_all(parent).await?;
239 }
240
241 let deadline = tokio::time::Instant::now() + LOCK_TIMEOUT;
242 loop {
243 match tokio::fs::OpenOptions::new()
244 .write(true)
245 .create_new(true)
246 .open(path)
247 .await
248 {
249 Ok(mut file) => {
250 file.write_all(b"config-runtime-lock").await?;
251 file.sync_all().await?;
252 return Ok(Self {
253 path: path.to_path_buf(),
254 });
255 }
256 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
257 if LockGuard::is_stale(path).await? {
258 let _ = tokio::fs::remove_file(path).await;
259 continue;
260 }
261 if tokio::time::Instant::now() >= deadline {
262 return Err(ConfigRuntimeError::LockTimeout);
263 }
264 tokio::time::sleep(LOCK_RETRY_DELAY).await;
265 }
266 Err(err) => return Err(ConfigRuntimeError::Io(err)),
267 }
268 }
269 }
270
271 async fn is_stale(path: &Path) -> Result<bool, ConfigRuntimeError> {
272 match tokio::fs::metadata(path).await {
273 Ok(metadata) => {
274 let modified = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
275 let age = SystemTime::now()
276 .duration_since(modified)
277 .unwrap_or_default();
278 Ok(age > LOCK_STALE_AFTER)
279 }
280 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false),
281 Err(err) => Err(ConfigRuntimeError::Io(err)),
282 }
283 }
284}
285
286impl Drop for LockGuard {
287 fn drop(&mut self) {
288 let _ = std::fs::remove_file(&self.path);
289 }
290}
291
292#[cfg(test)]
293#[allow(clippy::unwrap_used, clippy::expect_used)]
294mod tests {
295 use super::*;
296 use crate::{Config, ConfigDelta, MemoryConfigStore};
297 use serde_json::json;
298
299 #[tokio::test]
300 async fn generation_conflict_is_enforced() {
301 let temp = tempfile::tempdir().unwrap();
302 let store = Arc::new(MemoryConfigStore::new(Config::default()));
303 let runtime = ConfigRuntime::new(store, temp.path().join("state.json"));
304
305 let baseline = runtime.get().await.unwrap();
306 assert_eq!(baseline.generation, 0);
307
308 let mut updated = baseline.config.clone();
309 updated.agent.max_tokens_per_turn = 777;
310 let after = runtime.set(updated, Some(0)).await.unwrap();
311 assert_eq!(after.generation, 1);
312
313 let conflict = runtime
314 .patch(
315 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 1000}})),
316 Some(0),
317 )
318 .await
319 .unwrap_err();
320 assert!(matches!(
321 conflict,
322 ConfigRuntimeError::GenerationConflict {
323 expected: 0,
324 current: 1
325 }
326 ));
327 }
328
329 #[tokio::test]
330 async fn concurrent_writes_with_same_expected_generation_conflict() {
331 let temp = tempfile::tempdir().unwrap();
332 let store: Arc<dyn ConfigStore> = Arc::new(MemoryConfigStore::new(Config::default()));
333 let runtime_a = Arc::new(ConfigRuntime::new(
334 Arc::clone(&store),
335 temp.path().join("state.json"),
336 ));
337 let runtime_b = Arc::new(ConfigRuntime::new(
338 Arc::clone(&store),
339 temp.path().join("state.json"),
340 ));
341
342 let task_a = {
343 let runtime = Arc::clone(&runtime_a);
344 tokio::spawn(async move {
345 runtime
346 .patch(
347 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 111}})),
348 Some(0),
349 )
350 .await
351 })
352 };
353 let task_b = {
354 let runtime = Arc::clone(&runtime_b);
355 tokio::spawn(async move {
356 runtime
357 .patch(
358 ConfigDelta(json!({"agent": {"max_tokens_per_turn": 222}})),
359 Some(0),
360 )
361 .await
362 })
363 };
364
365 let res_a = task_a.await.unwrap();
366 let res_b = task_b.await.unwrap();
367
368 let ok_count = usize::from(res_a.is_ok()) + usize::from(res_b.is_ok());
369 let known_failure_count = usize::from(matches!(
370 res_a,
371 Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
372 )) + usize::from(matches!(
373 res_b,
374 Err(ConfigRuntimeError::GenerationConflict { .. } | ConfigRuntimeError::LockTimeout)
375 ));
376 assert!(ok_count <= 1);
377 assert_eq!(known_failure_count + ok_count, 2);
378 }
379
380 #[test]
381 fn config_envelope_policy_controls_resolved_paths_exposure() {
382 let snapshot = ConfigSnapshot {
383 config: Config::default(),
384 generation: 7,
385 metadata: Some(ConfigStoreMetadata {
386 realm_id: Some("team".to_string()),
387 instance_id: Some("instance".to_string()),
388 backend: Some("sqlite".to_string()),
389 resolved_paths: Some(ConfigResolvedPaths {
390 root: "/tmp/root".to_string(),
391 manifest_path: "/tmp/root/realm_manifest.json".to_string(),
392 config_path: "/tmp/root/config.toml".to_string(),
393 sessions_sqlite_path: Some("/tmp/root/sessions.sqlite3".to_string()),
394 sessions_jsonl_dir: "/tmp/root/sessions_jsonl".to_string(),
395 }),
396 }),
397 };
398
399 let public = ConfigEnvelope::from_snapshot(snapshot.clone(), ConfigEnvelopePolicy::Public);
400 assert!(public.resolved_paths.is_none());
401
402 let diagnostic = ConfigEnvelope::from_snapshot(snapshot, ConfigEnvelopePolicy::Diagnostic);
403 assert!(diagnostic.resolved_paths.is_some());
404 }
405}