1use std::collections::HashMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock, RwLock};
6
7use chrono::Utc;
8use runmat_filesystem::{DirEntry, FsFileType};
9use serde::{Deserialize, Serialize};
10
11use super::contracts::{AnalysisArtifactRecord, AnalysisRunResult};
12
13const ARTIFACT_SCHEMA_VERSION: &str = "analysis_run_artifact/v1";
14
15#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
16struct PersistedRunArtifact {
17 schema_version: String,
18 created_at: String,
19 op_version: String,
20 run: AnalysisRunResult,
21}
22
23pub trait AnalysisArtifactStore: Send + Sync {
24 fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String>;
25 fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String>;
26 fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String>;
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum AnalysisArtifactStoreConfig {
31 InMemory,
32 Filesystem { root: PathBuf },
33}
34
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct AnalysisArtifactRetentionConfig {
37 pub max_runs: Option<usize>,
38 pub max_runs_per_kind: Option<usize>,
39}
40
41pub struct InMemoryAnalysisArtifactStore {
42 runs: RwLock<HashMap<String, AnalysisRunResult>>,
43}
44
45impl InMemoryAnalysisArtifactStore {
46 pub fn new() -> Self {
47 Self {
48 runs: RwLock::new(HashMap::new()),
49 }
50 }
51}
52
53impl Default for InMemoryAnalysisArtifactStore {
54 fn default() -> Self {
55 Self::new()
56 }
57}
58
59impl AnalysisArtifactStore for InMemoryAnalysisArtifactStore {
60 fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
61 let mut guard = self
62 .runs
63 .write()
64 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
65 guard.insert(run.run_id.clone(), run.clone());
66 Ok(AnalysisArtifactRecord {
67 run_id: run.run_id.clone(),
68 created_at: Utc::now().to_rfc3339(),
69 op_version: run_operation_version(run),
70 field_ids: super::analysis_run_field_ids(run),
71 })
72 }
73
74 fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
75 let guard = self
76 .runs
77 .read()
78 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
79 Ok(guard.get(run_id).cloned())
80 }
81
82 fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String> {
83 let guard = self
84 .runs
85 .read()
86 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
87 Ok(guard.values().cloned().collect())
88 }
89}
90
91pub struct FilesystemAnalysisArtifactStore {
92 root: PathBuf,
93}
94
95impl FilesystemAnalysisArtifactStore {
96 pub fn new(root: PathBuf) -> Self {
97 Self { root }
98 }
99
100 fn run_path(&self, run_id: &str) -> PathBuf {
101 self.root.join("runs").join(format!("{run_id}.json"))
102 }
103}
104
105impl AnalysisArtifactStore for FilesystemAnalysisArtifactStore {
106 fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
107 let path = self.run_path(&run.run_id);
108 if let Some(parent) = path.parent() {
109 fs_create_dir_all(parent)
110 .map_err(|err| format!("failed to create artifact directory: {err}"))?;
111 }
112 let op_version = run_operation_version(run);
113 let persisted = PersistedRunArtifact {
114 schema_version: ARTIFACT_SCHEMA_VERSION.to_string(),
115 created_at: Utc::now().to_rfc3339(),
116 op_version: op_version.clone(),
117 run: run.clone(),
118 };
119 let bytes = serde_json::to_vec_pretty(&persisted)
120 .map_err(|err| format!("failed to encode run artifact: {err}"))?;
121 atomic_write(&path, &bytes)?;
122 prune_filesystem_runs(&self.root)?;
123
124 Ok(AnalysisArtifactRecord {
125 run_id: run.run_id.clone(),
126 created_at: Utc::now().to_rfc3339(),
127 op_version,
128 field_ids: super::analysis_run_field_ids(run),
129 })
130 }
131
132 fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
133 let path = self.run_path(run_id);
134 if !fs_exists(&path).map_err(|err| format!("failed to inspect run artifact: {err}"))? {
135 return Ok(None);
136 }
137 let bytes = fs_read(&path).map_err(|err| format!("failed to read run artifact: {err}"))?;
138 let run = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
139 Ok(persisted) => persisted.run,
140 Err(_) => serde_json::from_slice::<AnalysisRunResult>(&bytes)
141 .map_err(|err| format!("failed to parse run artifact: {err}"))?,
142 };
143 Ok(Some(run))
144 }
145
146 fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String> {
147 let runs_dir = self.root.join("runs");
148 if !fs_exists(&runs_dir).map_err(|err| format!("failed to inspect artifacts: {err}"))? {
149 return Ok(Vec::new());
150 }
151 let mut runs = Vec::new();
152 for entry in
153 fs_read_dir(&runs_dir).map_err(|err| format!("failed to scan artifacts: {err}"))?
154 {
155 let path = entry.path().to_path_buf();
156 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
157 continue;
158 }
159 let bytes =
160 fs_read(&path).map_err(|err| format!("failed to read run artifact: {err}"))?;
161 let parsed = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
162 Ok(persisted) => Some(persisted.run),
163 Err(_) => serde_json::from_slice::<AnalysisRunResult>(&bytes).ok(),
164 };
165 if let Some(run) = parsed {
166 runs.push(run);
167 }
168 }
169 Ok(runs)
170 }
171}
172
173fn run_operation_version(run: &AnalysisRunResult) -> String {
174 if run
175 .run
176 .diagnostics
177 .iter()
178 .any(|diag| diag.code == "FEA_ACOUSTIC_HARMONIC_RESPONSE")
179 {
180 "fea.run_acoustic/v1".to_string()
181 } else if run.electromagnetic_results.is_some()
182 || run
183 .run
184 .diagnostics
185 .iter()
186 .any(|diag| diag.code == "FEA_EM_STATIC")
187 {
188 "fea.run_electromagnetic/v1".to_string()
189 } else if run
190 .run
191 .diagnostics
192 .iter()
193 .any(|diag| diag.code == "FEA_CHT_COUPLING")
194 {
195 "fea.run_cht/v1".to_string()
196 } else if run
197 .run
198 .diagnostics
199 .iter()
200 .any(|diag| diag.code == "FEA_FSI_COUPLING")
201 {
202 "fea.run_fsi/v1".to_string()
203 } else if run
204 .run
205 .diagnostics
206 .iter()
207 .any(|diag| diag.code == "FEA_CFD_FLOW")
208 {
209 "fea.run_cfd/v1".to_string()
210 } else if run.nonlinear_results.is_some() {
211 "fea.run_nonlinear/v1".to_string()
212 } else if run.transient_results.is_some() {
213 "fea.run_transient/v1".to_string()
214 } else if run.modal_results.is_some() {
215 "fea.run_modal/v1".to_string()
216 } else {
217 "fea.run_linear_static/v1".to_string()
218 }
219}
220
221fn prune_filesystem_runs(root: &Path) -> Result<(), String> {
222 let retention = current_retention_config();
223 let max_runs = retention.max_runs.unwrap_or_else(|| {
224 std::env::var("RUNMAT_FEA_ARTIFACT_MAX_RUNS")
225 .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_MAX_RUNS"))
226 .ok()
227 .and_then(|value| value.parse::<usize>().ok())
228 .unwrap_or(0)
229 });
230 let max_runs_per_kind = retention.max_runs_per_kind.unwrap_or_else(|| {
231 std::env::var("RUNMAT_FEA_ARTIFACT_MAX_RUNS_PER_KIND")
232 .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_MAX_RUNS_PER_KIND"))
233 .ok()
234 .and_then(|value| value.parse::<usize>().ok())
235 .unwrap_or(0)
236 });
237 if max_runs == 0 && max_runs_per_kind == 0 {
238 return Ok(());
239 }
240
241 let runs_dir = root.join("runs");
242 if !fs_exists(&runs_dir).map_err(|err| format!("failed to inspect artifacts: {err}"))? {
243 return Ok(());
244 }
245 let mut artifacts = Vec::new();
246 for entry in fs_read_dir(&runs_dir).map_err(|err| format!("failed to scan artifacts: {err}"))? {
247 let path = entry.path().to_path_buf();
248 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
249 continue;
250 }
251 let bytes = fs_read(&path).map_err(|err| format!("failed to read artifact file: {err}"))?;
252 let (op_version, run_id) = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
253 Ok(persisted) => (persisted.op_version, persisted.run.run_id),
254 Err(_) => match serde_json::from_slice::<AnalysisRunResult>(&bytes) {
255 Ok(run) => (run_operation_version(&run), run.run_id),
256 Err(_) => continue,
257 },
258 };
259 let modified = fs_modified(&path).ok().flatten();
260 artifacts.push((path, op_version, run_id, modified));
261 }
262 artifacts.sort_by(|a, b| b.3.cmp(&a.3));
263
264 let mut to_remove = Vec::new();
265 if max_runs_per_kind > 0 {
266 let mut per_kind_counts: HashMap<String, usize> = HashMap::new();
267 for (path, op_version, _run_id, _modified) in &artifacts {
268 let count = per_kind_counts.entry(op_version.clone()).or_default();
269 *count += 1;
270 if *count > max_runs_per_kind {
271 to_remove.push(path.clone());
272 }
273 }
274 }
275 if max_runs > 0 {
276 for (index, (path, _op_version, _run_id, _modified)) in artifacts.iter().enumerate() {
277 if index >= max_runs {
278 to_remove.push(path.clone());
279 }
280 }
281 }
282 to_remove.sort();
283 to_remove.dedup();
284 for path in to_remove {
285 let _ = fs_remove_file(path);
286 }
287 Ok(())
288}
289
290fn global_store() -> &'static RwLock<Arc<dyn AnalysisArtifactStore>> {
291 static STORE: OnceLock<RwLock<Arc<dyn AnalysisArtifactStore>>> = OnceLock::new();
292 STORE.get_or_init(|| {
293 let default = store_from_config(config_from_env());
294 RwLock::new(default)
295 })
296}
297
298fn retention_config() -> &'static RwLock<AnalysisArtifactRetentionConfig> {
299 static CONFIG: OnceLock<RwLock<AnalysisArtifactRetentionConfig>> = OnceLock::new();
300 CONFIG.get_or_init(|| RwLock::new(AnalysisArtifactRetentionConfig::default()))
301}
302
303fn current_retention_config() -> AnalysisArtifactRetentionConfig {
304 retention_config()
305 .read()
306 .map(|guard| guard.clone())
307 .unwrap_or_default()
308}
309
310static NEXT_RUN_ID: AtomicU64 = AtomicU64::new(1);
311
312pub fn next_run_id() -> String {
313 let seq = NEXT_RUN_ID.fetch_add(1, Ordering::Relaxed);
314 format!("run_{}_{}", Utc::now().timestamp_millis(), seq)
315}
316
317pub fn persist_run_result(run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
318 let guard = global_store()
319 .read()
320 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
321 guard.persist_run(run)
322}
323
324pub fn load_run_result(run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
325 let guard = global_store()
326 .read()
327 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
328 guard.load_run(run_id)
329}
330
331pub fn list_run_results() -> Result<Vec<AnalysisRunResult>, String> {
332 let guard = global_store()
333 .read()
334 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
335 guard.list_runs()
336}
337
338pub fn configure_artifact_store(config: AnalysisArtifactStoreConfig) -> Result<(), String> {
339 let mut guard = global_store()
340 .write()
341 .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
342 *guard = store_from_config(config);
343 Ok(())
344}
345
346pub fn configure_artifact_retention(config: AnalysisArtifactRetentionConfig) -> Result<(), String> {
347 let mut guard = retention_config()
348 .write()
349 .map_err(|_| "analysis artifact retention config lock poisoned".to_string())?;
350 *guard = config;
351 Ok(())
352}
353
354pub fn configure_artifact_store_from_env() -> Result<(), String> {
355 configure_artifact_store(config_from_env())
356}
357
358fn store_from_config(config: AnalysisArtifactStoreConfig) -> Arc<dyn AnalysisArtifactStore> {
359 match config {
360 AnalysisArtifactStoreConfig::InMemory => Arc::new(InMemoryAnalysisArtifactStore::new()),
361 AnalysisArtifactStoreConfig::Filesystem { root } => {
362 Arc::new(FilesystemAnalysisArtifactStore::new(root))
363 }
364 }
365}
366
367fn config_from_env() -> AnalysisArtifactStoreConfig {
368 let mode = std::env::var("RUNMAT_FEA_ARTIFACT_STORE")
369 .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_STORE"))
370 .unwrap_or_else(|_| "filesystem".to_string())
371 .to_lowercase();
372 if mode == "filesystem" {
373 let root = std::env::var("RUNMAT_FEA_ARTIFACT_ROOT")
374 .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_ROOT"))
375 .map(PathBuf::from)
376 .unwrap_or_else(|_| default_filesystem_artifact_root());
377 AnalysisArtifactStoreConfig::Filesystem { root }
378 } else {
379 AnalysisArtifactStoreConfig::InMemory
380 }
381}
382
383pub fn default_filesystem_artifact_root() -> PathBuf {
384 PathBuf::from("artifacts")
385}
386
387fn atomic_write(path: &PathBuf, bytes: &[u8]) -> Result<(), String> {
388 let tmp = path.with_extension(format!(
389 "tmp-{}-{}",
390 std::process::id(),
391 Utc::now().timestamp_nanos_opt().unwrap_or_default()
392 ));
393 fs_write(&tmp, bytes).map_err(|err| format!("failed to write temp artifact file: {err}"))?;
394 fs_rename(&tmp, path).map_err(|err| {
395 let _ = fs_remove_file(&tmp);
396 format!("failed to atomically replace run artifact: {err}")
397 })
398}
399
400fn fs_create_dir_all(path: impl Into<PathBuf>) -> std::io::Result<()> {
401 runmat_filesystem::create_dir_all(path.into())
402}
403
404fn fs_read(path: impl Into<PathBuf>) -> std::io::Result<Vec<u8>> {
405 runmat_filesystem::read(path.into())
406}
407
408fn fs_write(path: impl Into<PathBuf>, bytes: &[u8]) -> std::io::Result<()> {
409 runmat_filesystem::write(path.into(), bytes)
410}
411
412fn fs_remove_file(path: impl Into<PathBuf>) -> std::io::Result<()> {
413 match runmat_filesystem::remove_file(path.into()) {
414 Ok(()) => Ok(()),
415 Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
416 Err(err) => Err(err),
417 }
418}
419
420fn fs_rename(from: impl Into<PathBuf>, to: impl Into<PathBuf>) -> std::io::Result<()> {
421 runmat_filesystem::rename(from.into(), to.into())
422}
423
424fn fs_read_dir(path: impl Into<PathBuf>) -> std::io::Result<Vec<DirEntry>> {
425 runmat_filesystem::read_dir(path.into())
426}
427
428fn fs_exists(path: impl Into<PathBuf>) -> std::io::Result<bool> {
429 match runmat_filesystem::metadata(path.into()) {
430 Ok(metadata) => Ok(matches!(
431 metadata.file_type(),
432 FsFileType::Directory | FsFileType::File | FsFileType::Symlink | FsFileType::Other
433 )),
434 Err(err) if err.kind() == ErrorKind::NotFound => Ok(false),
435 Err(err) => Err(err),
436 }
437}
438
439fn fs_modified(path: impl Into<PathBuf>) -> std::io::Result<Option<std::time::SystemTime>> {
440 runmat_filesystem::metadata(path.into()).map(|metadata| metadata.modified())
441}
442
443#[cfg(test)]
444pub fn set_artifact_store_for_tests(store: Arc<dyn AnalysisArtifactStore>) {
445 let mut guard = global_store()
446 .write()
447 .expect("analysis artifact store lock poisoned");
448 *guard = store;
449}
450
451#[cfg(test)]
452pub fn reset_artifact_store_for_tests() {
453 let mut guard = global_store()
454 .write()
455 .expect("analysis artifact store lock poisoned");
456 *guard = Arc::new(InMemoryAnalysisArtifactStore::new());
457 *retention_config()
458 .write()
459 .expect("analysis artifact retention config lock poisoned") =
460 AnalysisArtifactRetentionConfig::default();
461}