Skip to main content

runmat_runtime/analysis/
storage.rs

1use std::collections::HashMap;
2use std::io::ErrorKind;
3use std::path::{Path, PathBuf};
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, OnceLock, RwLock};
6
7use chrono::Utc;
8use runmat_filesystem::{DirEntry, FsFileType};
9use serde::{Deserialize, Serialize};
10
11use super::contracts::{AnalysisArtifactRecord, AnalysisRunResult};
12
13const ARTIFACT_SCHEMA_VERSION: &str = "analysis_run_artifact/v1";
14
15#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
16struct PersistedRunArtifact {
17    schema_version: String,
18    created_at: String,
19    op_version: String,
20    run: AnalysisRunResult,
21}
22
23pub trait AnalysisArtifactStore: Send + Sync {
24    fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String>;
25    fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String>;
26    fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String>;
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub enum AnalysisArtifactStoreConfig {
31    InMemory,
32    Filesystem { root: PathBuf },
33}
34
35#[derive(Debug, Clone, Default, PartialEq, Eq)]
36pub struct AnalysisArtifactRetentionConfig {
37    pub max_runs: Option<usize>,
38    pub max_runs_per_kind: Option<usize>,
39}
40
41pub struct InMemoryAnalysisArtifactStore {
42    runs: RwLock<HashMap<String, AnalysisRunResult>>,
43}
44
45impl InMemoryAnalysisArtifactStore {
46    pub fn new() -> Self {
47        Self {
48            runs: RwLock::new(HashMap::new()),
49        }
50    }
51}
52
53impl Default for InMemoryAnalysisArtifactStore {
54    fn default() -> Self {
55        Self::new()
56    }
57}
58
59impl AnalysisArtifactStore for InMemoryAnalysisArtifactStore {
60    fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
61        let mut guard = self
62            .runs
63            .write()
64            .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
65        guard.insert(run.run_id.clone(), run.clone());
66        Ok(AnalysisArtifactRecord {
67            run_id: run.run_id.clone(),
68            created_at: Utc::now().to_rfc3339(),
69            op_version: run_operation_version(run),
70            field_ids: super::analysis_run_field_ids(run),
71        })
72    }
73
74    fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
75        let guard = self
76            .runs
77            .read()
78            .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
79        Ok(guard.get(run_id).cloned())
80    }
81
82    fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String> {
83        let guard = self
84            .runs
85            .read()
86            .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
87        Ok(guard.values().cloned().collect())
88    }
89}
90
91pub struct FilesystemAnalysisArtifactStore {
92    root: PathBuf,
93}
94
95impl FilesystemAnalysisArtifactStore {
96    pub fn new(root: PathBuf) -> Self {
97        Self { root }
98    }
99
100    fn run_path(&self, run_id: &str) -> PathBuf {
101        self.root.join("runs").join(format!("{run_id}.json"))
102    }
103}
104
105impl AnalysisArtifactStore for FilesystemAnalysisArtifactStore {
106    fn persist_run(&self, run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
107        let path = self.run_path(&run.run_id);
108        if let Some(parent) = path.parent() {
109            fs_create_dir_all(parent)
110                .map_err(|err| format!("failed to create artifact directory: {err}"))?;
111        }
112        let op_version = run_operation_version(run);
113        let persisted = PersistedRunArtifact {
114            schema_version: ARTIFACT_SCHEMA_VERSION.to_string(),
115            created_at: Utc::now().to_rfc3339(),
116            op_version: op_version.clone(),
117            run: run.clone(),
118        };
119        let bytes = serde_json::to_vec_pretty(&persisted)
120            .map_err(|err| format!("failed to encode run artifact: {err}"))?;
121        atomic_write(&path, &bytes)?;
122        prune_filesystem_runs(&self.root)?;
123
124        Ok(AnalysisArtifactRecord {
125            run_id: run.run_id.clone(),
126            created_at: Utc::now().to_rfc3339(),
127            op_version,
128            field_ids: super::analysis_run_field_ids(run),
129        })
130    }
131
132    fn load_run(&self, run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
133        let path = self.run_path(run_id);
134        if !fs_exists(&path).map_err(|err| format!("failed to inspect run artifact: {err}"))? {
135            return Ok(None);
136        }
137        let bytes = fs_read(&path).map_err(|err| format!("failed to read run artifact: {err}"))?;
138        let run = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
139            Ok(persisted) => persisted.run,
140            Err(_) => serde_json::from_slice::<AnalysisRunResult>(&bytes)
141                .map_err(|err| format!("failed to parse run artifact: {err}"))?,
142        };
143        Ok(Some(run))
144    }
145
146    fn list_runs(&self) -> Result<Vec<AnalysisRunResult>, String> {
147        let runs_dir = self.root.join("runs");
148        if !fs_exists(&runs_dir).map_err(|err| format!("failed to inspect artifacts: {err}"))? {
149            return Ok(Vec::new());
150        }
151        let mut runs = Vec::new();
152        for entry in
153            fs_read_dir(&runs_dir).map_err(|err| format!("failed to scan artifacts: {err}"))?
154        {
155            let path = entry.path().to_path_buf();
156            if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
157                continue;
158            }
159            let bytes =
160                fs_read(&path).map_err(|err| format!("failed to read run artifact: {err}"))?;
161            let parsed = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
162                Ok(persisted) => Some(persisted.run),
163                Err(_) => serde_json::from_slice::<AnalysisRunResult>(&bytes).ok(),
164            };
165            if let Some(run) = parsed {
166                runs.push(run);
167            }
168        }
169        Ok(runs)
170    }
171}
172
173fn run_operation_version(run: &AnalysisRunResult) -> String {
174    if run
175        .run
176        .diagnostics
177        .iter()
178        .any(|diag| diag.code == "FEA_ACOUSTIC_HARMONIC_RESPONSE")
179    {
180        "fea.run_acoustic/v1".to_string()
181    } else if run.electromagnetic_results.is_some()
182        || run
183            .run
184            .diagnostics
185            .iter()
186            .any(|diag| diag.code == "FEA_EM_STATIC")
187    {
188        "fea.run_electromagnetic/v1".to_string()
189    } else if run
190        .run
191        .diagnostics
192        .iter()
193        .any(|diag| diag.code == "FEA_CHT_COUPLING")
194    {
195        "fea.run_cht/v1".to_string()
196    } else if run
197        .run
198        .diagnostics
199        .iter()
200        .any(|diag| diag.code == "FEA_FSI_COUPLING")
201    {
202        "fea.run_fsi/v1".to_string()
203    } else if run
204        .run
205        .diagnostics
206        .iter()
207        .any(|diag| diag.code == "FEA_CFD_FLOW")
208    {
209        "fea.run_cfd/v1".to_string()
210    } else if run.nonlinear_results.is_some() {
211        "fea.run_nonlinear/v1".to_string()
212    } else if run.transient_results.is_some() {
213        "fea.run_transient/v1".to_string()
214    } else if run.modal_results.is_some() {
215        "fea.run_modal/v1".to_string()
216    } else {
217        "fea.run_linear_static/v1".to_string()
218    }
219}
220
221fn prune_filesystem_runs(root: &Path) -> Result<(), String> {
222    let retention = current_retention_config();
223    let max_runs = retention.max_runs.unwrap_or_else(|| {
224        std::env::var("RUNMAT_FEA_ARTIFACT_MAX_RUNS")
225            .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_MAX_RUNS"))
226            .ok()
227            .and_then(|value| value.parse::<usize>().ok())
228            .unwrap_or(0)
229    });
230    let max_runs_per_kind = retention.max_runs_per_kind.unwrap_or_else(|| {
231        std::env::var("RUNMAT_FEA_ARTIFACT_MAX_RUNS_PER_KIND")
232            .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_MAX_RUNS_PER_KIND"))
233            .ok()
234            .and_then(|value| value.parse::<usize>().ok())
235            .unwrap_or(0)
236    });
237    if max_runs == 0 && max_runs_per_kind == 0 {
238        return Ok(());
239    }
240
241    let runs_dir = root.join("runs");
242    if !fs_exists(&runs_dir).map_err(|err| format!("failed to inspect artifacts: {err}"))? {
243        return Ok(());
244    }
245    let mut artifacts = Vec::new();
246    for entry in fs_read_dir(&runs_dir).map_err(|err| format!("failed to scan artifacts: {err}"))? {
247        let path = entry.path().to_path_buf();
248        if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
249            continue;
250        }
251        let bytes = fs_read(&path).map_err(|err| format!("failed to read artifact file: {err}"))?;
252        let (op_version, run_id) = match serde_json::from_slice::<PersistedRunArtifact>(&bytes) {
253            Ok(persisted) => (persisted.op_version, persisted.run.run_id),
254            Err(_) => match serde_json::from_slice::<AnalysisRunResult>(&bytes) {
255                Ok(run) => (run_operation_version(&run), run.run_id),
256                Err(_) => continue,
257            },
258        };
259        let modified = fs_modified(&path).ok().flatten();
260        artifacts.push((path, op_version, run_id, modified));
261    }
262    artifacts.sort_by(|a, b| b.3.cmp(&a.3));
263
264    let mut to_remove = Vec::new();
265    if max_runs_per_kind > 0 {
266        let mut per_kind_counts: HashMap<String, usize> = HashMap::new();
267        for (path, op_version, _run_id, _modified) in &artifacts {
268            let count = per_kind_counts.entry(op_version.clone()).or_default();
269            *count += 1;
270            if *count > max_runs_per_kind {
271                to_remove.push(path.clone());
272            }
273        }
274    }
275    if max_runs > 0 {
276        for (index, (path, _op_version, _run_id, _modified)) in artifacts.iter().enumerate() {
277            if index >= max_runs {
278                to_remove.push(path.clone());
279            }
280        }
281    }
282    to_remove.sort();
283    to_remove.dedup();
284    for path in to_remove {
285        let _ = fs_remove_file(path);
286    }
287    Ok(())
288}
289
290fn global_store() -> &'static RwLock<Arc<dyn AnalysisArtifactStore>> {
291    static STORE: OnceLock<RwLock<Arc<dyn AnalysisArtifactStore>>> = OnceLock::new();
292    STORE.get_or_init(|| {
293        let default = store_from_config(config_from_env());
294        RwLock::new(default)
295    })
296}
297
298fn retention_config() -> &'static RwLock<AnalysisArtifactRetentionConfig> {
299    static CONFIG: OnceLock<RwLock<AnalysisArtifactRetentionConfig>> = OnceLock::new();
300    CONFIG.get_or_init(|| RwLock::new(AnalysisArtifactRetentionConfig::default()))
301}
302
303fn current_retention_config() -> AnalysisArtifactRetentionConfig {
304    retention_config()
305        .read()
306        .map(|guard| guard.clone())
307        .unwrap_or_default()
308}
309
310static NEXT_RUN_ID: AtomicU64 = AtomicU64::new(1);
311
312pub fn next_run_id() -> String {
313    let seq = NEXT_RUN_ID.fetch_add(1, Ordering::Relaxed);
314    format!("run_{}_{}", Utc::now().timestamp_millis(), seq)
315}
316
317pub fn persist_run_result(run: &AnalysisRunResult) -> Result<AnalysisArtifactRecord, String> {
318    let guard = global_store()
319        .read()
320        .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
321    guard.persist_run(run)
322}
323
324pub fn load_run_result(run_id: &str) -> Result<Option<AnalysisRunResult>, String> {
325    let guard = global_store()
326        .read()
327        .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
328    guard.load_run(run_id)
329}
330
331pub fn list_run_results() -> Result<Vec<AnalysisRunResult>, String> {
332    let guard = global_store()
333        .read()
334        .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
335    guard.list_runs()
336}
337
338pub fn configure_artifact_store(config: AnalysisArtifactStoreConfig) -> Result<(), String> {
339    let mut guard = global_store()
340        .write()
341        .map_err(|_| "analysis artifact store lock poisoned".to_string())?;
342    *guard = store_from_config(config);
343    Ok(())
344}
345
346pub fn configure_artifact_retention(config: AnalysisArtifactRetentionConfig) -> Result<(), String> {
347    let mut guard = retention_config()
348        .write()
349        .map_err(|_| "analysis artifact retention config lock poisoned".to_string())?;
350    *guard = config;
351    Ok(())
352}
353
354pub fn configure_artifact_store_from_env() -> Result<(), String> {
355    configure_artifact_store(config_from_env())
356}
357
358fn store_from_config(config: AnalysisArtifactStoreConfig) -> Arc<dyn AnalysisArtifactStore> {
359    match config {
360        AnalysisArtifactStoreConfig::InMemory => Arc::new(InMemoryAnalysisArtifactStore::new()),
361        AnalysisArtifactStoreConfig::Filesystem { root } => {
362            Arc::new(FilesystemAnalysisArtifactStore::new(root))
363        }
364    }
365}
366
367fn config_from_env() -> AnalysisArtifactStoreConfig {
368    let mode = std::env::var("RUNMAT_FEA_ARTIFACT_STORE")
369        .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_STORE"))
370        .unwrap_or_else(|_| "filesystem".to_string())
371        .to_lowercase();
372    if mode == "filesystem" {
373        let root = std::env::var("RUNMAT_FEA_ARTIFACT_ROOT")
374            .or_else(|_| std::env::var("RUNMAT_ANALYSIS_ARTIFACT_ROOT"))
375            .map(PathBuf::from)
376            .unwrap_or_else(|_| default_filesystem_artifact_root());
377        AnalysisArtifactStoreConfig::Filesystem { root }
378    } else {
379        AnalysisArtifactStoreConfig::InMemory
380    }
381}
382
383pub fn default_filesystem_artifact_root() -> PathBuf {
384    PathBuf::from("artifacts")
385}
386
387fn atomic_write(path: &PathBuf, bytes: &[u8]) -> Result<(), String> {
388    let tmp = path.with_extension(format!(
389        "tmp-{}-{}",
390        std::process::id(),
391        Utc::now().timestamp_nanos_opt().unwrap_or_default()
392    ));
393    fs_write(&tmp, bytes).map_err(|err| format!("failed to write temp artifact file: {err}"))?;
394    fs_rename(&tmp, path).map_err(|err| {
395        let _ = fs_remove_file(&tmp);
396        format!("failed to atomically replace run artifact: {err}")
397    })
398}
399
400fn fs_create_dir_all(path: impl Into<PathBuf>) -> std::io::Result<()> {
401    runmat_filesystem::create_dir_all(path.into())
402}
403
404fn fs_read(path: impl Into<PathBuf>) -> std::io::Result<Vec<u8>> {
405    runmat_filesystem::read(path.into())
406}
407
408fn fs_write(path: impl Into<PathBuf>, bytes: &[u8]) -> std::io::Result<()> {
409    runmat_filesystem::write(path.into(), bytes)
410}
411
412fn fs_remove_file(path: impl Into<PathBuf>) -> std::io::Result<()> {
413    match runmat_filesystem::remove_file(path.into()) {
414        Ok(()) => Ok(()),
415        Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
416        Err(err) => Err(err),
417    }
418}
419
420fn fs_rename(from: impl Into<PathBuf>, to: impl Into<PathBuf>) -> std::io::Result<()> {
421    runmat_filesystem::rename(from.into(), to.into())
422}
423
424fn fs_read_dir(path: impl Into<PathBuf>) -> std::io::Result<Vec<DirEntry>> {
425    runmat_filesystem::read_dir(path.into())
426}
427
428fn fs_exists(path: impl Into<PathBuf>) -> std::io::Result<bool> {
429    match runmat_filesystem::metadata(path.into()) {
430        Ok(metadata) => Ok(matches!(
431            metadata.file_type(),
432            FsFileType::Directory | FsFileType::File | FsFileType::Symlink | FsFileType::Other
433        )),
434        Err(err) if err.kind() == ErrorKind::NotFound => Ok(false),
435        Err(err) => Err(err),
436    }
437}
438
439fn fs_modified(path: impl Into<PathBuf>) -> std::io::Result<Option<std::time::SystemTime>> {
440    runmat_filesystem::metadata(path.into()).map(|metadata| metadata.modified())
441}
442
443#[cfg(test)]
444pub fn set_artifact_store_for_tests(store: Arc<dyn AnalysisArtifactStore>) {
445    let mut guard = global_store()
446        .write()
447        .expect("analysis artifact store lock poisoned");
448    *guard = store;
449}
450
451#[cfg(test)]
452pub fn reset_artifact_store_for_tests() {
453    let mut guard = global_store()
454        .write()
455        .expect("analysis artifact store lock poisoned");
456    *guard = Arc::new(InMemoryAnalysisArtifactStore::new());
457    *retention_config()
458        .write()
459        .expect("analysis artifact retention config lock poisoned") =
460        AnalysisArtifactRetentionConfig::default();
461}