1use crate::config::OrchestratorConfig;
2use crate::config_load::{ConfigSelfHealChange, ResourceRemoval, now_ts};
3use crate::dto::ConfigOverview;
4use crate::resource::export_manifest_resources;
5use crate::secret_store_crypto::{
6 SecretEncryption, decrypt_resource_spec_json, encrypt_resource_spec_json, ensure_secret_key,
7 load_existing_secret_key, redact_secret_data_map, resolve_data_dir_from_db_path,
8};
9use anyhow::{Context, Result};
10use rusqlite::{OptionalExtension, Transaction, params};
11use std::collections::HashMap;
12use std::path::{Path, PathBuf};
13
14#[derive(Debug, Clone, serde::Serialize)]
15pub struct HealLogEntry {
17 pub version: i64,
19 pub original_error: String,
21 pub workflow_id: String,
23 pub step_id: String,
25 pub rule: String,
27 pub detail: String,
29 pub created_at: String,
31}
32
33pub trait ConfigRepository: Send + Sync {
35 fn load_or_seed_config(&self) -> Result<(OrchestratorConfig, String, i64, String)>;
37 fn load_config(&self) -> Result<Option<(OrchestratorConfig, i64, String)>>;
39 fn query_latest_heal_summary(
41 &self,
42 current_config_version: i64,
43 ) -> Result<Option<(i64, String, usize, String)>>;
44 fn query_heal_log_entries(&self, limit: usize) -> Result<Vec<HealLogEntry>>;
46 fn persist_self_heal_snapshot(
48 &self,
49 yaml: &str,
50 json_raw: &str,
51 original_error: &str,
52 changes: &[ConfigSelfHealChange],
53 ) -> Result<(i64, String)>;
54 fn persist_raw_config(
56 &self,
57 normalized: OrchestratorConfig,
58 yaml: &str,
59 json_raw: &str,
60 author: &str,
61 ) -> Result<ConfigOverview>;
62 fn persist_config_with_deletions(
64 &self,
65 normalized: OrchestratorConfig,
66 yaml: &str,
67 json_raw: &str,
68 author: &str,
69 deleted_resources: &[ResourceRemoval],
70 ) -> Result<ConfigOverview>;
71}
72
73pub struct SqliteConfigRepository {
75 db_path: PathBuf,
76}
77
78impl SqliteConfigRepository {
79 pub fn new(db_path: impl AsRef<Path>) -> Self {
81 Self {
82 db_path: db_path.as_ref().to_path_buf(),
83 }
84 }
85
86 fn open_conn(&self) -> Result<rusqlite::Connection> {
87 crate::db::open_conn(&self.db_path)
88 }
89}
90
91fn serialize_config_snapshot(config: &OrchestratorConfig) -> Result<(String, String)> {
92 let sanitized = sanitized_config_snapshot(config);
93 let yaml = export_manifest_resources(&sanitized)
94 .iter()
95 .map(crate::resource::Resource::to_yaml)
96 .collect::<Result<Vec<_>>>()?
97 .join("---\n");
98 let json_raw = serde_json::to_string(&sanitized)?;
99 Ok((yaml, json_raw))
100}
101
102fn sanitized_config_snapshot(config: &OrchestratorConfig) -> OrchestratorConfig {
103 let mut sanitized = config.clone();
104 for project in sanitized.projects.values_mut() {
105 for store in project.secret_stores.values_mut() {
106 for value in store.data.values_mut() {
107 *value = crate::secret_store_crypto::ENCRYPTED_PLACEHOLDER.to_string();
108 }
109 }
110 }
111 for resource in sanitized.resource_store.resources_mut().values_mut() {
112 if resource.kind != "SecretStore" {
113 continue;
114 }
115 if let Some(spec) = resource.spec.as_object_mut() {
116 if let Some(data) = spec.get_mut("data").and_then(|value| value.as_object_mut()) {
117 redact_secret_data_map(data);
118 }
119 }
120 }
121 sanitized
122}
123
124fn emit_decrypt_failed_audit(
125 conn: &rusqlite::Connection,
126 project: &str,
127 name: &str,
128 error: &anyhow::Error,
129) {
130 let _ = crate::secret_key_audit::insert_key_audit_event(
132 conn,
133 &crate::secret_key_audit::KeyAuditEvent {
134 event_kind: crate::secret_key_audit::KeyAuditEventKind::DecryptFailed,
135 key_id: "unknown".to_string(),
136 key_fingerprint: "unknown".to_string(),
137 actor: "system:load_resources".to_string(),
138 detail_json: serde_json::json!({
139 "project": project,
140 "name": name,
141 "error": error.to_string(),
142 })
143 .to_string(),
144 created_at: now_ts(),
145 },
146 );
147}
148
149pub(crate) fn persist_config_versioned(
150 tx: &Transaction<'_>,
151 yaml: &str,
152 json_raw: &str,
153 author: &str,
154) -> Result<(i64, String)> {
155 let current_version: i64 = tx.query_row(
156 "SELECT COALESCE(MAX(version), 0) FROM orchestrator_config_versions",
157 [],
158 |row| row.get(0),
159 )?;
160 let next_version = current_version + 1;
161 let now = now_ts();
162 tx.execute(
163 "INSERT INTO orchestrator_config_versions (version, config_yaml, config_json, created_at, author)
164 VALUES (?1, ?2, ?3, ?4, ?5)",
165 params![next_version, yaml, json_raw, now, author],
166 )?;
167 Ok((next_version, now))
168}
169
170pub(crate) fn persist_heal_log(
171 tx: &Transaction<'_>,
172 version: i64,
173 original_error: &str,
174 changes: &[ConfigSelfHealChange],
175) -> Result<()> {
176 let now = now_ts();
177 for change in changes {
178 tx.execute(
179 "INSERT INTO config_heal_log (version, original_error, workflow_id, step_id, rule, detail, created_at)
180 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
181 params![
182 version,
183 original_error,
184 change.workflow_id,
185 change.step_id,
186 change.rule.as_label(),
187 change.detail,
188 now
189 ],
190 )?;
191 }
192 Ok(())
193}
194
195fn persist_resource(
196 tx: &Transaction<'_>,
197 cr: &crate::crd::types::CustomResource,
198 author: &str,
199 secret_encryption: &SecretEncryption,
200) -> Result<()> {
201 let project = cr
202 .metadata
203 .project
204 .as_deref()
205 .filter(|project| !project.trim().is_empty())
206 .unwrap_or(crate::crd::store::SYSTEM_PROJECT);
207
208 if crate::crd::store::is_project_scoped(&cr.kind)
211 && project == crate::crd::store::SYSTEM_PROJECT
212 && cr.kind != "RuntimePolicy"
213 {
214 anyhow::bail!(
215 "project-scoped resource {}/{} must have an explicit project, not _system",
216 cr.kind,
217 cr.metadata.name
218 );
219 }
220
221 let spec_json = encrypt_resource_spec_json(
222 secret_encryption,
223 &cr.kind,
224 project,
225 &cr.metadata.name,
226 &cr.spec,
227 )?;
228 let metadata_json = serde_json::to_string(&cr.metadata)?;
229 let now = now_ts();
230
231 tx.execute(
232 "INSERT INTO resources (kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at)
233 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
234 ON CONFLICT(kind, project, name) DO UPDATE SET
235 api_version=excluded.api_version,
236 spec_json=excluded.spec_json,
237 metadata_json=excluded.metadata_json,
238 generation=generation+1,
239 updated_at=excluded.updated_at",
240 params![
241 cr.kind,
242 project,
243 cr.metadata.name,
244 cr.api_version,
245 spec_json,
246 metadata_json,
247 cr.generation,
248 cr.created_at,
249 now
250 ],
251 )?;
252
253 let next_version: i64 = tx.query_row(
254 "SELECT COALESCE(MAX(version), 0) + 1 FROM resource_versions WHERE kind=?1 AND project=?2 AND name=?3",
255 params![cr.kind, project, cr.metadata.name],
256 |row| row.get(0),
257 )?;
258 tx.execute(
259 "INSERT INTO resource_versions (kind, project, name, spec_json, metadata_json, version, author, created_at)
260 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
261 params![
262 cr.kind,
263 project,
264 cr.metadata.name,
265 spec_json,
266 metadata_json,
267 next_version,
268 author,
269 now
270 ],
271 )?;
272 Ok(())
273}
274
275fn persist_all_resources(
276 tx: &Transaction<'_>,
277 store: &crate::crd::store::ResourceStore,
278 crds: &HashMap<String, crate::crd::types::CustomResourceDefinition>,
279 author: &str,
280 secret_encryption: &SecretEncryption,
281) -> Result<()> {
282 for cr in store.resources().values() {
283 persist_resource(tx, cr, author, secret_encryption)?;
284 }
285 let now = now_ts();
286 for (kind_name, crd) in crds {
287 let spec_json = serde_json::to_string(crd)?;
288 tx.execute(
289 "INSERT INTO resources (kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at)
290 VALUES ('CustomResourceDefinition', ?1, ?2, 'orchestrator.dev/v2', ?3, '{}', 1, ?4, ?5)
291 ON CONFLICT(kind, project, name) DO UPDATE SET
292 spec_json=excluded.spec_json, generation=generation+1, updated_at=excluded.updated_at",
293 params![crate::crd::store::SYSTEM_PROJECT, kind_name, spec_json, now, now],
294 )?;
295 }
296 Ok(())
297}
298
299fn delete_resource_row(
300 tx: &Transaction<'_>,
301 kind: &str,
302 project: &str,
303 name: &str,
304 author: &str,
305) -> Result<bool> {
306 let deleted = tx.execute(
307 "DELETE FROM resources WHERE kind=?1 AND project=?2 AND name=?3",
308 params![kind, project, name],
309 )? > 0;
310 if deleted {
311 let now = now_ts();
312 tx.execute(
313 "INSERT INTO resource_versions (kind, project, name, spec_json, metadata_json, version, author, created_at)
314 VALUES (?1, ?2, ?3, '\"deleted\"', '{}', -1, ?4, ?5)",
315 params![kind, project, name, author, now],
316 )?;
317 }
318 Ok(deleted)
319}
320
321fn load_all_resources(
322 db_path: &Path,
323) -> Result<(
324 crate::crd::store::ResourceStore,
325 HashMap<String, crate::crd::types::CustomResourceDefinition>,
326)> {
327 let data_dir = resolve_data_dir_from_db_path(db_path)?;
328 let secret_encryption = match crate::secret_key_lifecycle::load_keyring(&data_dir, db_path) {
330 Ok(keyring) => {
331 if keyring.has_active_key() {
332 SecretEncryption::from_keyring(&keyring).ok()
333 } else {
334 load_existing_secret_key(&data_dir)?.map(SecretEncryption::from_key)
335 }
336 }
337 Err(_) => load_existing_secret_key(&data_dir)?.map(SecretEncryption::from_key),
338 };
339 let conn = crate::db::open_conn(db_path)?;
340 let table_exists: bool = conn
341 .query_row(
342 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='resources'",
343 [],
344 |row| row.get(0),
345 )
346 .unwrap_or(false);
347 if !table_exists {
348 return Ok((crate::crd::store::ResourceStore::default(), HashMap::new()));
349 }
350
351 let mut store = crate::crd::store::ResourceStore::default();
352 let mut crds = HashMap::new();
353 let mut stmt = conn.prepare(
354 "SELECT kind, project, name, api_version, spec_json, metadata_json, generation, created_at, updated_at
355 FROM resources",
356 )?;
357 let rows = stmt.query_map([], |row| {
358 Ok((
359 row.get::<_, String>(0)?,
360 row.get::<_, String>(1)?,
361 row.get::<_, String>(2)?,
362 row.get::<_, String>(3)?,
363 row.get::<_, String>(4)?,
364 row.get::<_, String>(5)?,
365 row.get::<_, i64>(6)?,
366 row.get::<_, String>(7)?,
367 row.get::<_, String>(8)?,
368 ))
369 })?;
370
371 for row in rows {
372 let (
373 kind,
374 project,
375 name,
376 api_version,
377 spec_json,
378 metadata_json,
379 generation,
380 created_at,
381 updated_at,
382 ) = row?;
383 if kind == "CustomResourceDefinition" {
384 if let Ok(crd) =
385 serde_json::from_str::<crate::crd::types::CustomResourceDefinition>(&spec_json)
386 {
387 crds.insert(name, crd);
388 }
389 continue;
390 }
391
392 let spec = match decrypt_resource_spec_json(
393 secret_encryption.as_ref(),
394 &kind,
395 &project,
396 &name,
397 &spec_json,
398 ) {
399 Ok(v) => v,
400 Err(e) => {
401 if kind == "SecretStore" {
403 emit_decrypt_failed_audit(&conn, &project, &name, &e);
404 let inner = e.to_string();
405 if inner.contains("secret key is unavailable")
406 || inner.contains("no decryption key")
407 {
408 return Err(e).with_context(|| format!(
409 "SecretStore write blocked: cannot load {project}/{name} — no active encryption key (run `orchestrator secret key list` to check key state)"
410 ));
411 }
412 }
413 return Err(e)
414 .with_context(|| format!("failed to load resource {kind}/{project}/{name}"));
415 }
416 };
417
418 let metadata: crate::cli_types::ResourceMetadata = serde_json::from_str(&metadata_json)
419 .unwrap_or_else(|_| crate::cli_types::ResourceMetadata {
420 name: name.clone(),
421 project: if project == crate::crd::store::SYSTEM_PROJECT {
422 None
423 } else {
424 Some(project.clone())
425 },
426 labels: None,
427 annotations: None,
428 });
429
430 store.put(crate::crd::types::CustomResource {
431 kind,
432 api_version,
433 metadata,
434 spec,
435 generation: generation as u64,
436 created_at,
437 updated_at,
438 });
439 }
440
441 Ok((store, crds))
442}
443
444fn query_max_resource_version(db_path: &Path) -> Result<i64> {
445 let conn = crate::db::open_conn(db_path)?;
446 let table_exists: bool = conn
447 .query_row(
448 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='resource_versions'",
449 [],
450 |row| row.get(0),
451 )
452 .unwrap_or(false);
453 if !table_exists {
454 return Ok(0);
455 }
456 let version: i64 = conn.query_row(
457 "SELECT COALESCE(MAX(version), 0) FROM resource_versions WHERE version > 0",
458 [],
459 |row| row.get(0),
460 )?;
461 Ok(version)
462}
463
464fn load_config_from_resources_table(
465 db_path: &Path,
466) -> Result<Option<(OrchestratorConfig, i64, String)>> {
467 let (store, crds) = load_all_resources(db_path)?;
468 if store.is_empty() {
469 return Ok(None);
470 }
471 let mut config = OrchestratorConfig {
472 resource_store: store,
473 custom_resource_definitions: crds,
474 ..Default::default()
475 };
476 crate::crd::writeback::reconcile_all_builtins(&mut config);
477 for kind in [
478 "Agent",
479 "Workflow",
480 "Workspace",
481 "StepTemplate",
482 "ExecutionProfile",
483 "EnvStore",
484 "SecretStore",
485 ] {
486 let resources: Vec<(Option<String>, String)> = config
487 .resource_store
488 .list_by_kind(kind)
489 .iter()
490 .map(|cr| (cr.metadata.project.clone(), cr.metadata.name.clone()))
491 .collect();
492 for (project, name) in resources {
493 crate::crd::writeback::reconcile_single_resource(
494 &mut config,
495 kind,
496 project.as_deref(),
497 &name,
498 );
499 }
500 }
501 for crd_kind in config.custom_resource_definitions.keys() {
503 if crate::crd::resolve::is_builtin_kind(crd_kind) {
504 continue;
505 }
506 for cr in config.resource_store.list_by_kind(crd_kind) {
507 let storage_key = format!("{}/{}", cr.kind, cr.metadata.name);
508 config.custom_resources.insert(storage_key, cr.clone());
509 }
510 }
511 Ok(Some((
512 crate::config_load::normalize_config(config),
513 query_max_resource_version(db_path)?,
514 now_ts(),
515 )))
516}
517
518impl ConfigRepository for SqliteConfigRepository {
519 fn load_or_seed_config(&self) -> Result<(OrchestratorConfig, String, i64, String)> {
520 if let Some((config, version, updated_at)) = self.load_config()? {
521 let (yaml, _json_raw) = serialize_config_snapshot(&config)?;
522 return Ok((config, yaml, version, updated_at));
523 }
524
525 let config = OrchestratorConfig::default();
526 let (yaml, _json_raw) = serialize_config_snapshot(&config)?;
527 Ok((config, yaml, 0, now_ts()))
528 }
529
530 fn load_config(&self) -> Result<Option<(OrchestratorConfig, i64, String)>> {
531 load_config_from_resources_table(&self.db_path)
532 }
533
534 fn query_latest_heal_summary(
535 &self,
536 current_config_version: i64,
537 ) -> Result<Option<(i64, String, usize, String)>> {
538 let conn = self.open_conn()?;
539 let row: Option<(i64, String, String)> = conn
540 .query_row(
541 "SELECT version, original_error, created_at FROM config_heal_log ORDER BY id DESC LIMIT 1",
542 [],
543 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
544 )
545 .optional()?;
546
547 let Some((version, original_error, created_at)) = row else {
548 return Ok(None);
549 };
550 if version != current_config_version {
551 return Ok(None);
552 }
553
554 let count: usize = conn.query_row(
555 "SELECT COUNT(*) FROM config_heal_log WHERE version = ?1",
556 params![version],
557 |row| row.get(0),
558 )?;
559 Ok(Some((version, original_error, count, created_at)))
560 }
561
562 fn query_heal_log_entries(&self, limit: usize) -> Result<Vec<HealLogEntry>> {
563 let conn = self.open_conn()?;
564 let mut stmt = conn.prepare(
565 "SELECT version, original_error, workflow_id, step_id, rule, detail, created_at
566 FROM config_heal_log ORDER BY id DESC LIMIT ?1",
567 )?;
568 let rows = stmt.query_map(params![limit], |row| {
569 Ok(HealLogEntry {
570 version: row.get(0)?,
571 original_error: row.get(1)?,
572 workflow_id: row.get(2)?,
573 step_id: row.get(3)?,
574 rule: row.get(4)?,
575 detail: row.get(5)?,
576 created_at: row.get(6)?,
577 })
578 })?;
579 let mut entries = Vec::new();
580 for row in rows {
581 entries.push(row?);
582 }
583 Ok(entries)
584 }
585
586 fn persist_self_heal_snapshot(
587 &self,
588 yaml: &str,
589 json_raw: &str,
590 original_error: &str,
591 changes: &[ConfigSelfHealChange],
592 ) -> Result<(i64, String)> {
593 let conn = self.open_conn()?;
594 let tx = conn.unchecked_transaction()?;
595 let (version, created_at) = persist_config_versioned(&tx, yaml, json_raw, "self-heal")?;
596 persist_heal_log(&tx, version, original_error, changes)?;
597 tx.commit()?;
598 Ok((version, created_at))
599 }
600
601 fn persist_raw_config(
602 &self,
603 normalized: OrchestratorConfig,
604 yaml: &str,
605 json_raw: &str,
606 author: &str,
607 ) -> Result<ConfigOverview> {
608 let data_dir = resolve_data_dir_from_db_path(&self.db_path)?;
609 let secret_encryption =
610 SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?);
611 let conn = self.open_conn()?;
612 let tx = conn.unchecked_transaction()?;
613 let (version, updated_at) = persist_config_versioned(&tx, yaml, json_raw, author)?;
614 persist_all_resources(
615 &tx,
616 &normalized.resource_store,
617 &normalized.custom_resource_definitions,
618 author,
619 &secret_encryption,
620 )?;
621 tx.commit()?;
622 Ok(ConfigOverview {
623 config: normalized,
624 yaml: yaml.to_owned(),
625 version,
626 updated_at,
627 })
628 }
629
630 fn persist_config_with_deletions(
631 &self,
632 normalized: OrchestratorConfig,
633 yaml: &str,
634 json_raw: &str,
635 author: &str,
636 deleted_resources: &[ResourceRemoval],
637 ) -> Result<ConfigOverview> {
638 let data_dir = resolve_data_dir_from_db_path(&self.db_path)?;
639 let has_secret_stores = !normalized
640 .resource_store
641 .list_by_kind("SecretStore")
642 .is_empty();
643 let secret_encryption = match crate::secret_key_lifecycle::load_keyring(
644 &data_dir,
645 &self.db_path,
646 ) {
647 Ok(keyring) => {
648 if keyring.has_active_key() {
649 SecretEncryption::from_keyring(&keyring)?
650 } else if has_secret_stores {
651 anyhow::bail!(
652 "SecretStore write blocked: no active encryption key (all keys revoked or retired)"
653 );
654 } else {
655 SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?)
656 }
657 }
658 Err(_) => SecretEncryption::from_key(ensure_secret_key(&data_dir, &self.db_path)?),
659 };
660 let conn = self.open_conn()?;
661 let tx = conn.unchecked_transaction()?;
662 crate::config_load::enforce_deletion_guards_for_removals(&tx, deleted_resources)?;
663 for deletion in deleted_resources {
664 let _ = delete_resource_row(
665 &tx,
666 &deletion.kind,
667 &deletion.project_id,
668 &deletion.name,
669 author,
670 )?;
671 }
672 let (version, updated_at) = persist_config_versioned(&tx, yaml, json_raw, author)?;
673 persist_all_resources(
674 &tx,
675 &normalized.resource_store,
676 &normalized.custom_resource_definitions,
677 author,
678 &secret_encryption,
679 )?;
680 tx.commit()?;
681 Ok(ConfigOverview {
682 config: normalized,
683 yaml: yaml.to_owned(),
684 version,
685 updated_at,
686 })
687 }
688}