Skip to main content

agent_orchestrator/persistence/repository/
config.rs

1use crate::config::OrchestratorConfig;
2use crate::config_load::{ConfigSelfHealChange, ResourceRemoval, now_ts};
3use crate::dto::ConfigOverview;
4use crate::resource::export_manifest_resources;
5use crate::secret_store_crypto::{
6    SecretEncryption, decrypt_resource_spec_json, encrypt_resource_spec_json, ensure_secret_key,
7    load_existing_secret_key, redact_secret_data_map, resolve_data_dir_from_db_path,
8};
9use anyhow::{Context, Result};
10use rusqlite::{OptionalExtension, Transaction, params};
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13
14#[derive(Debug, Clone, serde::Serialize)]
15/// One persisted config self-heal log entry.
16pub struct HealLogEntry {
17    /// Config version associated with the heal event.
18    pub version: i64,
19    /// Original validation error that triggered the heal.
20    pub original_error: String,
21    /// Workflow identifier containing the healed step.
22    pub workflow_id: String,
23    /// Step identifier affected by the heal.
24    pub step_id: String,
25    /// Stable self-heal rule label.
26    pub rule: String,
27    /// Human-readable change detail.
28    pub detail: String,
29    /// Timestamp when the heal log row was created.
30    pub created_at: String,
31}
32
33/// Persistence interface for versioned orchestrator configuration snapshots.
34pub trait ConfigRepository: Send + Sync {
35    /// Loads the latest config snapshot or seeds the initial one when absent.
36    fn load_or_seed_config(&self) -> Result<(OrchestratorConfig, String, i64, String)>;
37    /// Loads the latest persisted config snapshot without seeding.
38    fn load_config(&self) -> Result<Option<(OrchestratorConfig, i64, String)>>;
39    /// Returns aggregate information about the latest self-heal run for the current version.
40    fn query_latest_heal_summary(
41        &self,
42        current_config_version: i64,
43    ) -> Result<Option<(i64, String, usize, String)>>;
44    /// Returns recent self-heal log entries.
45    fn query_heal_log_entries(&self, limit: usize) -> Result<Vec<HealLogEntry>>;
46    /// Persists a self-healed config snapshot and its detailed change log.
47    fn persist_self_heal_snapshot(
48        &self,
49        yaml: &str,
50        json_raw: &str,
51        original_error: &str,
52        changes: &[ConfigSelfHealChange],
53    ) -> Result<(i64, String)>;
54    /// Persists a normalized config snapshot without resource deletions.
55    fn persist_raw_config(
56        &self,
57        normalized: OrchestratorConfig,
58        yaml: &str,
59        json_raw: &str,
60        author: &str,
61    ) -> Result<ConfigOverview>;
62    /// Persists a normalized config snapshot and records resource deletions.
63    fn persist_config_with_deletions(
64        &self,
65        normalized: OrchestratorConfig,
66        yaml: &str,
67        json_raw: &str,
68        author: &str,
69        deleted_resources: &[ResourceRemoval],
70    ) -> Result<ConfigOverview>;
71}
72
73/// SQLite-backed implementation of the config repository.
74pub struct SqliteConfigRepository {
75    db_path: PathBuf,
76}
77
78impl SqliteConfigRepository {
79    /// Creates a config repository that reads and writes the given SQLite database.
80    pub fn new(db_path: impl AsRef<Path>) -> Self {
81        Self {
82            db_path: db_path.as_ref().to_path_buf(),
83        }
84    }
85
86    fn open_conn(&self) -> Result<rusqlite::Connection> {
87        crate::db::open_conn(&self.db_path)
88    }
89}
90
91fn serialize_config_snapshot(config: &OrchestratorConfig) -> Result<(String, String)> {
92    let sanitized = sanitized_config_snapshot(config);
93    let yaml = export_manifest_resources(&sanitized)
94        .iter()
95        .map(crate::resource::Resource::to_yaml)
96        .collect::<Result<Vec<_>>>()?
97        .join("---\n");
98    let json_raw = serde_json::to_string(&sanitized)?;
99    Ok((yaml, json_raw))
100}
101
102fn sanitized_config_snapshot(config: &OrchestratorConfig) -> OrchestratorConfig {
103    let mut sanitized = config.clone();
104    for project in sanitized.projects.values_mut() {
105        for store in project.secret_stores.values_mut() {
106            for value in store.data.values_mut() {
107                *value = crate::secret_store_crypto::ENCRYPTED_PLACEHOLDER.to_string();
108            }
109        }
110    }
111    for resource in sanitized.resource_store.resources_mut().values_mut() {
112        if resource.kind != "SecretStore" {
113            continue;
114        }
115        if let Some(spec) = resource.spec.as_object_mut() {
116            if let Some(data) = spec.get_mut("data").and_then(|value| value.as_object_mut()) {
117                redact_secret_data_map(data);
118            }
119        }
120    }
121    sanitized
122}
123
124fn emit_decrypt_failed_audit(
125    conn: &rusqlite::Connection,
126    project: &str,
127    name: &str,
128    error: &anyhow::Error,
129) {
130    // Best-effort: if the audit table doesn't exist yet, skip silently
131    let _ = crate::secret_key_audit::insert_key_audit_event(
132        conn,
133        &crate::secret_key_audit::KeyAuditEvent {
134            event_kind: crate::secret_key_audit::KeyAuditEventKind::DecryptFailed,
135            key_id: "unknown".to_string(),
136            key_fingerprint: "unknown".to_string(),
137            actor: "system:load_resources".to_string(),
138            detail_json: serde_json::json!({
139                "project": project,
140                "name": name,
141                "error": error.to_string(),
142            })
143            .to_string(),
144            created_at: now_ts(),
145        },
146    );
147}
148
149pub(crate) fn persist_config_versioned(
150    tx: &Transaction<'_>,
151    yaml: &str,
152    json_raw: &str,
153    author: &str,
154) -> Result<(i64, String)> {
155    let current_version: i64 = tx.query_row(
156        "SELECT COALESCE(MAX(version), 0) FROM orchestrator_config_versions",
157        [],
158        |row| row.get(0),
159    )?;
160    let next_version = current_version + 1;
161    let now = now_ts();
162    tx.execute(
163        "INSERT INTO orchestrator_config_versions (version, config_yaml, config_json, created_at, author)
164         VALUES (?1, ?2, ?3, ?4, ?5)",
165        params![next_version, yaml, json_raw, now, author],
166    )?;
167    Ok((next_version, now))
168}
169
170pub(crate) fn persist_heal_log(
171    tx: &Transaction<'_>,
172    version: i64,
173    original_error: &str,
174    changes: &[ConfigSelfHealChange],
175) -> Result<()> {
176    let now = now_ts();
177    for change in changes {
178        tx.execute(
179            "INSERT INTO config_heal_log (version, original_error, workflow_id, step_id, rule, detail, created_at)
180             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
181            params![
182                version,
183                original_error,
184                change.workflow_id,
185                change.step_id,
186                change.rule.as_label(),
187                change.detail,
188                now
189            ],
190        )?;
191    }
192    Ok(())
193}
194
195fn persist_resource(
196    tx: &Transaction<'_>,
197    cr: &crate::crd::types::CustomResource,
198    author: &str,
199    secret_encryption: &SecretEncryption,
200) -> Result<()> {
201    let project = cr
202        .metadata
203        .project
204        .as_deref()
205        .filter(|project| !project.trim().is_empty())
206        .unwrap_or(crate::crd::store::SYSTEM_PROJECT);
207
208    // RuntimePolicy is project-scoped but also has a system-level default in
209    // _system that serves as fallback for projects without their own policy.
210    if crate::crd::store::is_project_scoped(&cr.kind)
211        && project == crate::crd::store::SYSTEM_PROJECT
212        && cr.kind != "RuntimePolicy"
213    {
214        anyhow::bail!(
215            "project-scoped resource {}/{} must have an explicit project, not _system",
216            cr.kind,
217            cr.metadata.name
218        );
219    }
220
221    let spec_json = encrypt_resource_spec_json(
222        secret_encryption,
223        &cr.kind,
224        project,
225        &cr.metadata.name,
226        &cr.spec,
227    )?;
228    let metadata_json = serde_json::to_string(&cr.metadata)?;
229    let now = now_ts();
230
231    tx.execute(
232        "INSERT INTO resources (kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at)
233         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
234         ON CONFLICT(kind, project, name) DO UPDATE SET
235           api_version=excluded.api_version,
236           spec_json=excluded.spec_json,
237           metadata_json=excluded.metadata_json,
238           generation=generation+1,
239           updated_at=excluded.updated_at",
240        params![
241            cr.kind,
242            project,
243            cr.metadata.name,
244            cr.api_version,
245            spec_json,
246            metadata_json,
247            cr.generation,
248            cr.created_at,
249            now
250        ],
251    )?;
252
253    let next_version: i64 = tx.query_row(
254        "SELECT COALESCE(MAX(version), 0) + 1 FROM resource_versions WHERE kind=?1 AND project=?2 AND name=?3",
255        params![cr.kind, project, cr.metadata.name],
256        |row| row.get(0),
257    )?;
258    tx.execute(
259        "INSERT INTO resource_versions (kind, project, name, spec_json, metadata_json, version, author, created_at)
260         VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
261        params![
262            cr.kind,
263            project,
264            cr.metadata.name,
265            spec_json,
266            metadata_json,
267            next_version,
268            author,
269            now
270        ],
271    )?;
272    Ok(())
273}
274
275fn persist_all_resources(
276    tx: &Transaction<'_>,
277    store: &crate::crd::store::ResourceStore,
278    crds: &HashMap<String, crate::crd::types::CustomResourceDefinition>,
279    author: &str,
280    secret_encryption: &SecretEncryption,
281) -> Result<()> {
282    for cr in store.resources().values() {
283        persist_resource(tx, cr, author, secret_encryption)?;
284    }
285    let now = now_ts();
286    for (kind_name, crd) in crds {
287        let spec_json = serde_json::to_string(crd)?;
288        tx.execute(
289            "INSERT INTO resources (kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at)
290             VALUES ('CustomResourceDefinition', ?1, ?2, 'orchestrator.dev/v2', ?3, '{}', 1, ?4, ?5)
291             ON CONFLICT(kind, project, name) DO UPDATE SET
292               spec_json=excluded.spec_json, generation=generation+1, updated_at=excluded.updated_at",
293            params![crate::crd::store::SYSTEM_PROJECT, kind_name, spec_json, now, now],
294        )?;
295    }
296    Ok(())
297}
298
299fn delete_resource_row(
300    tx: &Transaction<'_>,
301    kind: &str,
302    project: &str,
303    name: &str,
304    author: &str,
305) -> Result<bool> {
306    let deleted = tx.execute(
307        "DELETE FROM resources WHERE kind=?1 AND project=?2 AND name=?3",
308        params![kind, project, name],
309    )? > 0;
310    if deleted {
311        let now = now_ts();
312        tx.execute(
313            "INSERT INTO resource_versions (kind, project, name, spec_json, metadata_json, version, author, created_at)
314             VALUES (?1, ?2, ?3, '\"deleted\"', '{}', -1, ?4, ?5)",
315            params![kind, project, name, author, now],
316        )?;
317    }
318    Ok(deleted)
319}
320
321fn load_all_resources(
322    db_path: &Path,
323) -> Result<(
324    crate::crd::store::ResourceStore,
325    HashMap<String, crate::crd::types::CustomResourceDefinition>,
326)> {
327    let data_dir = resolve_data_dir_from_db_path(db_path)?;
328    // Try loading via KeyRing for multi-key support; fall back to single-key
329    let secret_encryption = match crate::secret_key_lifecycle::load_keyring(&data_dir, db_path) {
330        Ok(keyring) => {
331            if keyring.has_active_key() {
332                SecretEncryption::from_keyring(&keyring).ok()
333            } else {
334                load_existing_secret_key(&data_dir)?.map(SecretEncryption::from_key)
335            }
336        }
337        Err(_) => load_existing_secret_key(&data_dir)?.map(SecretEncryption::from_key),
338    };
339    let conn = crate::db::open_conn(db_path)?;
340    let table_exists: bool = conn
341        .query_row(
342            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='resources'",
343            [],
344            |row| row.get(0),
345        )
346        .unwrap_or(false);
347    if !table_exists {
348        return Ok((crate::crd::store::ResourceStore::default(), HashMap::new()));
349    }
350
351    let mut store = crate::crd::store::ResourceStore::default();
352    let mut crds = HashMap::new();
353    let mut stmt = conn.prepare(
354        "SELECT kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at
355         FROM resources",
356    )?;
357    let rows = stmt.query_map([], |row| {
358        Ok((
359            row.get::<_, String>(0)?,
360            row.get::<_, String>(1)?,
361            row.get::<_, String>(2)?,
362            row.get::<_, String>(3)?,
363            row.get::<_, String>(4)?,
364            row.get::<_, String>(5)?,
365            row.get::<_, i64>(6)?,
366            row.get::<_, String>(7)?,
367            row.get::<_, String>(8)?,
368        ))
369    })?;
370
371    for row in rows {
372        let (
373            kind,
374            project,
375            name,
376            api_version,
377            spec_json,
378            metadata_json,
379            generation,
380            created_at,
381            updated_at,
382        ) = row?;
383        if kind == "CustomResourceDefinition" {
384            if let Ok(crd) =
385                serde_json::from_str::<crate::crd::types::CustomResourceDefinition>(&spec_json)
386            {
387                crds.insert(name, crd);
388            }
389            continue;
390        }
391
392        let spec = match decrypt_resource_spec_json(
393            secret_encryption.as_ref(),
394            &kind,
395            &project,
396            &name,
397            &spec_json,
398        ) {
399            Ok(v) => v,
400            Err(e) => {
401                // Write DecryptFailed audit event (best-effort)
402                if kind == "SecretStore" {
403                    emit_decrypt_failed_audit(&conn, &project, &name, &e);
404                    let inner = e.to_string();
405                    if inner.contains("secret key is unavailable")
406                        || inner.contains("no decryption key")
407                    {
408                        return Err(e).with_context(|| format!(
409                            "SecretStore write blocked: cannot load {project}/{name} — no active encryption key (run `orchestrator secret key list` to check key state)"
410                        ));
411                    }
412                }
413                return Err(e)
414                    .with_context(|| format!("failed to load resource {kind}/{project}/{name}"));
415            }
416        };
417
418        let metadata: crate::cli_types::ResourceMetadata = serde_json::from_str(&metadata_json)
419            .unwrap_or_else(|_| crate::cli_types::ResourceMetadata {
420                name: name.clone(),
421                project: if project == crate::crd::store::SYSTEM_PROJECT {
422                    None
423                } else {
424                    Some(project.clone())
425                },
426                labels: None,
427                annotations: None,
428            });
429
430        store.put(crate::crd::types::CustomResource {
431            kind,
432            api_version,
433            metadata,
434            spec,
435            generation: generation as u64,
436            created_at,
437            updated_at,
438        });
439    }
440
441    Ok((store, crds))
442}
443
444fn query_max_resource_version(db_path: &Path) -> Result<i64> {
445    let conn = crate::db::open_conn(db_path)?;
446    let table_exists: bool = conn
447        .query_row(
448            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='resource_versions'",
449            [],
450            |row| row.get(0),
451        )
452        .unwrap_or(false);
453    if !table_exists {
454        return Ok(0);
455    }
456    let version: i64 = conn.query_row(
457        "SELECT COALESCE(MAX(version), 0) FROM resource_versions WHERE version > 0",
458        [],
459        |row| row.get(0),
460    )?;
461    Ok(version)
462}
463
464fn load_config_from_resources_table(
465    db_path: &Path,
466) -> Result<Option<(OrchestratorConfig, i64, String)>> {
467    let (store, crds) = load_all_resources(db_path)?;
468    if store.is_empty() {
469        return Ok(None);
470    }
471    let mut config = OrchestratorConfig {
472        resource_store: store,
473        custom_resource_definitions: crds,
474        ..Default::default()
475    };
476    crate::crd::writeback::reconcile_all_builtins(&mut config);
477    for kind in [
478        "Agent",
479        "Workflow",
480        "Workspace",
481        "StepTemplate",
482        "ExecutionProfile",
483        "EnvStore",
484        "SecretStore",
485    ] {
486        let resources: Vec<(Option<String>, String)> = config
487            .resource_store
488            .list_by_kind(kind)
489            .iter()
490            .map(|cr| (cr.metadata.project.clone(), cr.metadata.name.clone()))
491            .collect();
492        for (project, name) in resources {
493            crate::crd::writeback::reconcile_single_resource(
494                &mut config,
495                kind,
496                project.as_deref(),
497                &name,
498            );
499        }
500    }
501    // Populate custom_resources from resource_store for non-builtin CRD kinds
502    for crd_kind in config.custom_resource_definitions.keys() {
503        if crate::crd::resolve::is_builtin_kind(crd_kind) {
504            continue;
505        }
506        for cr in config.resource_store.list_by_kind(crd_kind) {
507            let storage_key = format!("{}/{}", cr.kind, cr.metadata.name);
508            config.custom_resources.insert(storage_key, cr.clone());
509        }
510    }
511    Ok(Some((
512        crate::config_load::normalize_config(config),
513        query_max_resource_version(db_path)?,
514        now_ts(),
515    )))
516}
517
518impl ConfigRepository for SqliteConfigRepository {
519    fn load_or_seed_config(&self) -> Result<(OrchestratorConfig, String, i64, String)> {
520        if let Some((config, version, updated_at)) = self.load_config()? {
521            let (yaml, _json_raw) = serialize_config_snapshot(&config)?;
522            return Ok((config, yaml, version, updated_at));
523        }
524
525        let config = OrchestratorConfig::default();
526        let (yaml, _json_raw) = serialize_config_snapshot(&config)?;
527        Ok((config, yaml, 0, now_ts()))
528    }
529
530    fn load_config(&self) -> Result<Option<(OrchestratorConfig, i64, String)>> {
531        load_config_from_resources_table(&self.db_path)
532    }
533
534    fn query_latest_heal_summary(
535        &self,
536        current_config_version: i64,
537    ) -> Result<Option<(i64, String, usize, String)>> {
538        let conn = self.open_conn()?;
539        let row: Option<(i64, String, String)> = conn
540            .query_row(
541                "SELECT version, original_error, created_at FROM config_heal_log ORDER BY id DESC LIMIT 1",
542                [],
543                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
544            )
545            .optional()?;
546
547        let Some((version, original_error, created_at)) = row else {
548            return Ok(None);
549        };
550        if version != current_config_version {
551            return Ok(None);
552        }
553
554        let count: usize = conn.query_row(
555            "SELECT COUNT(*) FROM config_heal_log WHERE version = ?1",
556            params![version],
557            |row| row.get(0),
558        )?;
559        Ok(Some((version, original_error, count, created_at)))
560    }
561
562    fn query_heal_log_entries(&self, limit: usize) -> Result<Vec<HealLogEntry>> {
563        let conn = self.open_conn()?;
564        let mut stmt = conn.prepare(
565            "SELECT version, original_error, workflow_id, step_id, rule, detail, created_at
566             FROM config_heal_log ORDER BY id DESC LIMIT ?1",
567        )?;
568        let rows = stmt.query_map(params![limit], |row| {
569            Ok(HealLogEntry {
570                version: row.get(0)?,
571                original_error: row.get(1)?,
572                workflow_id: row.get(2)?,
573                step_id: row.get(3)?,
574                rule: row.get(4)?,
575                detail: row.get(5)?,
576                created_at: row.get(6)?,
577            })
578        })?;
579        let mut entries = Vec::new();
580        for row in rows {
581            entries.push(row?);
582        }
583        Ok(entries)
584    }
585
586    fn persist_self_heal_snapshot(
587        &self,
588        yaml: &str,
589        json_raw: &str,
590        original_error: &str,
591        changes: &[ConfigSelfHealChange],
592    ) -> Result<(i64, String)> {
593        let conn = self.open_conn()?;
594        let tx = conn.unchecked_transaction()?;
595        let (version, created_at) = persist_config_versioned(&tx, yaml, json_raw, "self-heal")?;
596        persist_heal_log(&tx, version, original_error, changes)?;
597        tx.commit()?;
598        Ok((version, created_at))
599    }
600
601    fn persist_raw_config(
602        &self,
603        normalized: OrchestratorConfig,
604        yaml: &str,
605        json_raw: &str,
606        author: &str,
607    ) -> Result<ConfigOverview> {
608        let data_dir = resolve_data_dir_from_db_path(&self.db_path)?;
609        let secret_encryption =
610            SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?);
611        let conn = self.open_conn()?;
612        let tx = conn.unchecked_transaction()?;
613        let (version, updated_at) = persist_config_versioned(&tx, yaml, json_raw, author)?;
614        persist_all_resources(
615            &tx,
616            &normalized.resource_store,
617            &normalized.custom_resource_definitions,
618            author,
619            &secret_encryption,
620        )?;
621        tx.commit()?;
622        Ok(ConfigOverview {
623            config: normalized,
624            yaml: yaml.to_owned(),
625            version,
626            updated_at,
627        })
628    }
629
630    fn persist_config_with_deletions(
631        &self,
632        normalized: OrchestratorConfig,
633        yaml: &str,
634        json_raw: &str,
635        author: &str,
636        deleted_resources: &[ResourceRemoval],
637    ) -> Result<ConfigOverview> {
638        let data_dir = resolve_data_dir_from_db_path(&self.db_path)?;
639        let has_secret_stores = !normalized
640            .resource_store
641            .list_by_kind("SecretStore")
642            .is_empty();
643        let secret_encryption = match crate::secret_key_lifecycle::load_keyring(
644            &data_dir,
645            &self.db_path,
646        ) {
647            Ok(keyring) => {
648                if keyring.has_active_key() {
649                    SecretEncryption::from_keyring(&keyring)?
650                } else if has_secret_stores {
651                    anyhow::bail!(
652                        "SecretStore write blocked: no active encryption key (all keys revoked or retired)"
653                    );
654                } else {
655                    SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?)
656                }
657            }
658            Err(_) => SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?),
659        };
660        let conn = self.open_conn()?;
661        let tx = conn.unchecked_transaction()?;
662        crate::config_load::enforce_deletion_guards_for_removals(&tx, deleted_resources)?;
663        for deletion in deleted_resources {
664            let _ = delete_resource_row(
665                &tx,
666                &deletion.kind,
667                &deletion.project_id,
668                &deletion.name,
669                author,
670            )?;
671        }
672        let (version, updated_at) = persist_config_versioned(&tx, yaml, json_raw, author)?;
673        persist_all_resources(
674            &tx,
675            &normalized.resource_store,
676            &normalized.custom_resource_definitions,
677            author,
678            &secret_encryption,
679        )?;
680        tx.commit()?;
681        Ok(ConfigOverview {
682            config: normalized,
683            yaml: yaml.to_owned(),
684            version,
685            updated_at,
686        })
687    }
688}