1use std::collections::{BTreeMap, BTreeSet};
2use std::fs::{self, OpenOptions};
3use std::io::{ErrorKind, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6
7use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions};
8use omnigraph_compiler::SchemaMigrationPlan;
9use omnigraph_compiler::build_catalog;
10use omnigraph_compiler::query::parser::parse_query;
11use omnigraph_compiler::query::typecheck::typecheck_query_decl;
12use omnigraph_compiler::schema::parser::parse_schema;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use time::OffsetDateTime;
17use time::format_description::well_known::Rfc3339;
18use ulid::Ulid;
19
20pub mod failpoints;
21
22mod config;
23mod diff;
24mod serve;
25mod store;
26mod sweep;
27mod types;
28use config::{
29 QueriesDecl, future_field_diagnostics, graph_address, initial_import_state, load_desired,
30 normalize_policy_target, observe_declared_graphs, observe_live_graph, parse_cluster_config,
31 policy_address, preview_schema_migration, query_address, resolve_config_path,
32 resolve_query_decls, schema_address, state_resource_digests, validate_cluster_header,
33 validate_id, validate_query_source,
34};
35use diff::{
36 FailedGraphOrigin, ResourceKind, append_embedding_profile_changes,
37 append_policy_binding_changes, approved_resources, classify_changes, compute_approvals,
38 compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind,
39};
40pub use serve::{
41 ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_graph_ids,
42 cluster_root_for_graph_uri, read_serving_snapshot, read_serving_snapshot_from_storage,
43 resolve_graph_storage_uri,
44};
45use store::{ClusterStore, StateLockGuard, StateSnapshot};
46use sweep::{
47 mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars,
48 tombstone_graph_subtree, warn_pending_recovery_sidecars,
49};
50pub use types::*;
51use types::*;
52
53pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
54pub const CLUSTER_GRAPHS_DIR: &str = "graphs";
55pub const CLUSTER_STATE_DIR: &str = "__cluster";
56pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json";
57pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json";
58pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
59pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
60pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
61
62fn store_for(config_dir: &Path, storage_root: Option<&str>) -> Result<ClusterStore, Diagnostic> {
65 match storage_root {
66 Some(root) => ClusterStore::for_storage_root(root),
67 None => Ok(ClusterStore::for_config_dir(config_dir)),
68 }
69}
70
71pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
72 let outcome = load_desired(config_dir.as_ref());
73 let (resource_digests, resources, dependencies) = match outcome.desired {
74 Some(desired) => (
75 desired.resource_digests,
76 desired.resources,
77 desired.dependencies,
78 ),
79 None => (BTreeMap::new(), Vec::new(), Vec::new()),
80 };
81 let ok = !has_errors(&outcome.diagnostics);
82
83 ValidateOutput {
84 ok,
85 config_dir: display_path(&outcome.config_dir),
86 config_file: display_path(&outcome.config_file),
87 resource_digests,
88 resources,
89 dependencies,
90 diagnostics: outcome.diagnostics,
91 }
92}
93
94pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
95 let outcome = load_desired(config_dir.as_ref());
96 let mut diagnostics = outcome.diagnostics;
97 let storage_root = outcome
98 .desired
99 .as_ref()
100 .and_then(|desired| desired.storage_root.clone());
101 let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
102 Ok(backend) => backend,
103 Err(diagnostic) => {
104 diagnostics.push(diagnostic);
105 ClusterStore::for_config_dir(&outcome.config_dir)
106 }
107 };
108 let mut observations = backend.observations();
109
110 let Some(desired) = outcome.desired else {
111 return PlanOutput {
112 ok: false,
113 config_dir: display_path(&outcome.config_dir),
114 desired_revision: DesiredRevision {
115 config_digest: None,
116 },
117 resource_digests: BTreeMap::new(),
118 dependencies: Vec::new(),
119 state_observations: observations,
120 changes: Vec::new(),
121 blast_radius: Vec::new(),
122 approvals_required: Vec::new(),
123 diagnostics,
124 };
125 };
126
127 if has_errors(&diagnostics) {
128 return PlanOutput {
129 ok: false,
130 config_dir: display_path(&desired.config_dir),
131 desired_revision: DesiredRevision {
132 config_digest: Some(desired.config_digest),
133 },
134 resource_digests: desired.resource_digests,
135 dependencies: desired.dependencies,
136 state_observations: observations,
137 changes: Vec::new(),
138 blast_radius: Vec::new(),
139 approvals_required: Vec::new(),
140 diagnostics,
141 };
142 }
143
144 let _lock_guard = if desired.state_lock {
145 match backend.acquire_lock("plan", &mut observations).await {
146 Ok(guard) => Some(guard),
147 Err(diagnostic) => {
148 diagnostics.push(diagnostic);
149 None
150 }
151 }
152 } else {
153 diagnostics.push(Diagnostic::warning(
154 "state_lock_disabled",
155 "state.lock",
156 "state.lock is false; plan read state without acquiring the cluster state lock",
157 ));
158 None
159 };
160
161 warn_pending_recovery_sidecars(&backend, &mut diagnostics).await;
164
165 let mut prior_resources = BTreeMap::new();
166 let mut prior_state: Option<ClusterState> = None;
167 if !has_errors(&diagnostics) {
168 match backend.read_state(&mut observations).await {
169 Ok(snapshot) => {
170 if let Some(state) = snapshot.state {
171 prior_resources = state_resource_digests(&state);
172 prior_state = Some(state);
173 }
174 }
175 Err(diagnostic) => diagnostics.push(diagnostic),
176 }
177 }
178
179 let mut changes = if has_errors(&diagnostics) {
180 Vec::new()
181 } else {
182 diff_resources(&prior_resources, &desired.resource_digests)
183 };
184 if !has_errors(&diagnostics) {
185 append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired);
186 append_embedding_profile_changes(&mut changes, prior_state.as_ref(), &desired);
187 }
188 let artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
191 let approved = approved_resources(
192 &artifacts,
193 &changes,
194 &desired.config_digest,
195 &mut diagnostics,
196 );
197 classify_changes(
198 &mut changes,
199 &desired.dependencies,
200 &BTreeSet::new(),
201 &approved,
202 );
203
204 for change in &mut changes {
207 if change.operation != PlanOperation::Update {
208 continue;
209 }
210 let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else {
211 continue;
212 };
213 let graph_uri = backend.graph_root(&graph_id);
214 let source_path = desired
215 .resources
216 .iter()
217 .find(|resource| resource.address == change.resource)
218 .and_then(|resource| resource.path.clone());
219 let preview = match source_path {
220 Some(path) => preview_schema_migration(&graph_uri, &path).await,
221 None => Err("no schema source recorded".to_string()),
222 };
223 match preview {
224 Ok(migration) => change.migration = Some(migration),
225 Err(err) => diagnostics.push(Diagnostic::warning(
226 "schema_preview_unavailable",
227 change.resource.clone(),
228 format!("could not preview the schema migration: {err}"),
229 )),
230 }
231 }
232 let blast_radius = compute_blast_radius(&changes, &desired.dependencies);
233 let approvals_required = compute_approvals(&changes, &approved);
234 let ok = !has_errors(&diagnostics);
235
236 PlanOutput {
237 ok,
238 config_dir: display_path(&desired.config_dir),
239 desired_revision: DesiredRevision {
240 config_digest: Some(desired.config_digest),
241 },
242 resource_digests: desired.resource_digests,
243 dependencies: desired.dependencies,
244 state_observations: observations,
245 changes,
246 blast_radius,
247 approvals_required,
248 diagnostics,
249 }
250}
251
252#[derive(Debug, Clone, Default)]
267pub struct ApplyOptions {
268 pub actor: Option<String>,
269}
270
271pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
272 apply_config_dir_with_options(config_dir, ApplyOptions::default()).await
273}
274
275pub async fn apply_config_dir_with_options(
276 config_dir: impl AsRef<Path>,
277 options: ApplyOptions,
278) -> ApplyOutput {
279 let outcome = load_desired(config_dir.as_ref());
280 let mut diagnostics = outcome.diagnostics;
281 let storage_root = outcome
282 .desired
283 .as_ref()
284 .and_then(|desired| desired.storage_root.clone());
285 let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
286 Ok(backend) => backend,
287 Err(diagnostic) => {
288 diagnostics.push(diagnostic);
289 ClusterStore::for_config_dir(&outcome.config_dir)
290 }
291 };
292 let mut observations = backend.observations();
293
294 let actor_for_output = options.actor.clone();
295 let early_return = |config_dir: String,
296 config_digest: Option<String>,
297 observations: StateObservations,
298 changes: Vec<PlanChange>,
299 resource_statuses: BTreeMap<String, ResourceStatusRecord>,
300 diagnostics: Vec<Diagnostic>| {
301 ApplyOutput {
302 ok: !has_errors(&diagnostics),
303 config_dir,
304 actor: actor_for_output.clone(),
305 desired_revision: DesiredRevision { config_digest },
306 state_observations: observations,
307 changes,
308 applied_count: 0,
309 deferred_count: 0,
310 converged: false,
311 state_written: false,
312 resource_statuses,
313 diagnostics,
314 }
315 };
316
317 let Some(desired) = outcome.desired else {
318 return early_return(
319 display_path(&outcome.config_dir),
320 None,
321 observations,
322 Vec::new(),
323 BTreeMap::new(),
324 diagnostics,
325 );
326 };
327
328 if has_errors(&diagnostics) {
329 return early_return(
330 display_path(&desired.config_dir),
331 Some(desired.config_digest),
332 observations,
333 Vec::new(),
334 BTreeMap::new(),
335 diagnostics,
336 );
337 }
338
339 let _lock_guard = if desired.state_lock {
341 match backend.acquire_lock("apply", &mut observations).await {
342 Ok(guard) => Some(guard),
343 Err(diagnostic) => {
344 diagnostics.push(diagnostic);
345 None
346 }
347 }
348 } else {
349 diagnostics.push(Diagnostic::warning(
350 "state_lock_disabled",
351 "state.lock",
352 "state.lock is false; apply wrote state without acquiring the cluster state lock",
353 ));
354 None
355 };
356
357 if has_errors(&diagnostics) {
358 return early_return(
359 display_path(&desired.config_dir),
360 Some(desired.config_digest),
361 observations,
362 Vec::new(),
363 BTreeMap::new(),
364 diagnostics,
365 );
366 }
367
368 let snapshot = match backend.read_state(&mut observations).await {
369 Ok(snapshot) => snapshot,
370 Err(diagnostic) => {
371 diagnostics.push(diagnostic);
372 return early_return(
373 display_path(&desired.config_dir),
374 Some(desired.config_digest),
375 observations,
376 Vec::new(),
377 BTreeMap::new(),
378 diagnostics,
379 );
380 }
381 };
382 let expected_cas = snapshot.state_cas;
383 let Some(mut state) = snapshot.state else {
384 diagnostics.push(Diagnostic::error(
385 "state_missing",
386 CLUSTER_STATE_FILE,
387 "apply requires an existing state.json; run `cluster import` to bootstrap state",
388 ));
389 return early_return(
390 display_path(&desired.config_dir),
391 Some(desired.config_digest),
392 observations,
393 Vec::new(),
394 BTreeMap::new(),
395 diagnostics,
396 );
397 };
398
399 let before_value =
402 serde_json::to_value(&state).expect("cluster state must serialize deterministically");
403 let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
404
405 let prior_resources = state_resource_digests(&state);
406 let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
407 append_policy_binding_changes(&mut changes, Some(&state), &desired);
408 append_embedding_profile_changes(&mut changes, Some(&state), &desired);
409 let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
410 let approved = approved_resources(
411 &approval_artifacts,
412 &changes,
413 &desired.config_digest,
414 &mut diagnostics,
415 );
416 classify_changes(
417 &mut changes,
418 &desired.dependencies,
419 &sweep.pending_graphs,
420 &approved,
421 );
422
423 let approvals = compute_approvals(&changes, &approved);
427 let approval_violation = changes.iter().any(|change| {
428 change.disposition == Some(ApplyDisposition::Applied)
429 && approvals
430 .iter()
431 .any(|approval| approval.resource == change.resource && !approval.satisfied)
432 });
433 if approval_violation {
434 diagnostics.push(Diagnostic::error(
435 "apply_approval_invariant_violation",
436 "changes",
437 "an executable change requires approval; refusing to apply",
438 ));
439 return early_return(
440 display_path(&desired.config_dir),
441 Some(desired.config_digest),
442 observations,
443 changes,
444 state.resource_statuses,
445 diagnostics,
446 );
447 }
448
449 let source_paths: BTreeMap<&str, &str> = desired
454 .resources
455 .iter()
456 .filter_map(|resource| {
457 resource
458 .path
459 .as_deref()
460 .map(|path| (resource.address.as_str(), path))
461 })
462 .collect();
463 let graph_creates_to_run: Vec<String> = changes
464 .iter()
465 .filter(|change| {
466 change.disposition == Some(ApplyDisposition::Applied)
467 && change.operation == PlanOperation::Create
468 && matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
469 })
470 .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
471 .collect();
472 let mut completed_op_sidecars: Vec<String> = Vec::new();
473 let mut failed_graphs: BTreeMap<String, FailedGraphOrigin> = BTreeMap::new();
474 let mut graph_moving_aborted = false;
475 for graph_id in &graph_creates_to_run {
476 if graph_moving_aborted {
477 diagnostics.push(Diagnostic::warning(
479 "graph_create_skipped",
480 graph_address(graph_id),
481 "skipped after an earlier graph create failed in this run",
482 ));
483 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
484 continue;
485 }
486 let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
487 continue;
488 };
489 let graph_uri = backend.graph_root(graph_id);
490 let mut sidecar = RecoverySidecar {
491 schema_version: 1,
492 operation_id: Ulid::new().to_string(),
493 started_at: now_rfc3339(),
494 actor: options.actor.clone(),
495 kind: RecoverySidecarKind::GraphCreate,
496 graph_id: graph_id.clone(),
497 graph_uri: graph_uri.clone(),
498 observed_manifest_version: None,
499 expected_manifest_version: None,
500 desired_schema_digest: desired_graph.schema_digest.clone(),
501 state_cas_base: expected_cas.clone(),
502 approval_id: None,
503 };
504 let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
505 Ok(path) => path,
506 Err(diagnostic) => {
507 diagnostics.push(diagnostic);
508 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
509 graph_moving_aborted = true;
510 continue;
511 }
512 };
513 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE) {
514 diagnostics.push(diagnostic);
517 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
518 graph_moving_aborted = true;
519 continue;
520 }
521 let schema_source = source_paths
524 .get(schema_address(graph_id).as_str())
525 .ok_or_else(|| {
526 Diagnostic::error(
527 "graph_create_failed",
528 graph_address(graph_id),
529 "no schema source recorded for graph",
530 )
531 })
532 .and_then(|path| {
533 fs::read_to_string(Path::new(path)).map_err(|err| {
534 Diagnostic::error(
535 "graph_create_failed",
536 graph_address(graph_id),
537 format!("could not read schema source '{path}': {err}"),
538 )
539 })
540 })
541 .and_then(|source| {
542 if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
543 Ok(source)
544 } else {
545 Err(Diagnostic::error(
546 "resource_content_changed",
547 schema_address(graph_id),
548 "schema source changed while apply was running; re-run `cluster apply`",
549 ))
550 }
551 });
552 let schema_source = match schema_source {
553 Ok(source) => source,
554 Err(diagnostic) => {
555 diagnostics.push(diagnostic);
556 backend.delete_object(&sidecar_path).await; failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
558 graph_moving_aborted = true;
559 continue;
560 }
561 };
562 match Omnigraph::init(&graph_uri, &schema_source).await {
563 Ok(_) => {}
564 Err(err) => {
565 diagnostics.push(Diagnostic::error(
566 "graph_create_failed",
567 graph_address(graph_id),
568 format!("could not initialize graph at '{graph_uri}': {err}"),
569 ));
570 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
573 graph_moving_aborted = true;
574 continue;
575 }
576 }
577 if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await {
580 if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await {
581 sidecar.expected_manifest_version = Some(snapshot.version());
582 if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
583 diagnostics.push(diagnostic);
584 }
585 }
586 }
587 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE) {
591 diagnostics.push(diagnostic);
592 return early_return(
593 display_path(&desired.config_dir),
594 Some(desired.config_digest),
595 observations,
596 changes,
597 state.resource_statuses,
598 diagnostics,
599 );
600 }
601 completed_op_sidecars.push(sidecar_path);
602 }
603
604 let schema_updates_to_run: Vec<String> = changes
607 .iter()
608 .filter(|change| {
609 change.disposition == Some(ApplyDisposition::Applied)
610 && change.operation == PlanOperation::Update
611 && matches!(resource_kind(&change.resource), ResourceKind::Schema(_))
612 })
613 .filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string))
614 .collect();
615 for graph_id in &schema_updates_to_run {
616 if graph_moving_aborted {
617 diagnostics.push(Diagnostic::warning(
618 "schema_apply_skipped",
619 schema_address(graph_id),
620 "skipped after an earlier graph-moving operation failed in this run",
621 ));
622 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
623 continue;
624 }
625 let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
626 continue;
627 };
628 let graph_uri = backend.graph_root(graph_id);
629 let db = match Omnigraph::open(&graph_uri).await {
632 Ok(db) => db,
633 Err(err) => {
634 diagnostics.push(Diagnostic::error(
635 "schema_apply_failed",
636 schema_address(graph_id),
637 format!("could not open graph at '{graph_uri}': {err}"),
638 ));
639 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
640 graph_moving_aborted = true;
641 continue;
642 }
643 };
644 let schema_source = source_paths
648 .get(schema_address(graph_id).as_str())
649 .ok_or_else(|| {
650 Diagnostic::error(
651 "schema_apply_failed",
652 schema_address(graph_id),
653 "no schema source recorded for graph",
654 )
655 })
656 .and_then(|path| {
657 fs::read_to_string(Path::new(path)).map_err(|err| {
658 Diagnostic::error(
659 "schema_apply_failed",
660 schema_address(graph_id),
661 format!("could not read schema source '{path}': {err}"),
662 )
663 })
664 })
665 .and_then(|source| {
666 if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
667 Ok(source)
668 } else {
669 Err(Diagnostic::error(
670 "resource_content_changed",
671 schema_address(graph_id),
672 "schema source changed while apply was running; re-run `cluster apply`",
673 ))
674 }
675 });
676 let schema_source = match schema_source {
677 Ok(source) => source,
678 Err(diagnostic) => {
679 diagnostics.push(diagnostic);
680 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
681 graph_moving_aborted = true;
682 continue;
683 }
684 };
685 if let Err(err) = db
686 .preview_schema_apply_with_options(&schema_source, SchemaApplyOptions::default())
687 .await
688 {
689 diagnostics.push(Diagnostic::error(
690 "schema_apply_failed",
691 schema_address(graph_id),
692 format!("schema apply is not supported on '{graph_uri}': {err}"),
693 ));
694 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
695 graph_moving_aborted = true;
696 continue;
697 }
698 let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await {
699 Ok(snapshot) => Some(snapshot.version()),
700 Err(_) => None,
701 };
702 let recorded_schema_digest = state
703 .applied_revision
704 .resources
705 .get(&schema_address(graph_id))
706 .map(|entry| entry.digest.clone());
707 let mut sidecar = RecoverySidecar {
708 schema_version: 1,
709 operation_id: Ulid::new().to_string(),
710 started_at: now_rfc3339(),
711 actor: options.actor.clone(),
712 kind: RecoverySidecarKind::SchemaApply,
713 graph_id: graph_id.clone(),
714 graph_uri: graph_uri.clone(),
715 observed_manifest_version,
716 expected_manifest_version: None,
717 desired_schema_digest: desired_graph.schema_digest.clone(),
718 state_cas_base: expected_cas.clone(),
719 approval_id: None,
720 };
721 let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
722 Ok(path) => path,
723 Err(diagnostic) => {
724 diagnostics.push(diagnostic);
725 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
726 graph_moving_aborted = true;
727 continue;
728 }
729 };
730 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY) {
731 diagnostics.push(diagnostic);
734 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
735 graph_moving_aborted = true;
736 continue;
737 }
738 match db
741 .apply_schema_as(
742 &schema_source,
743 SchemaApplyOptions::default(),
744 options.actor.as_deref(),
745 )
746 .await
747 {
748 Ok(result) => {
749 sidecar.expected_manifest_version = Some(result.manifest_version);
750 if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
751 diagnostics.push(diagnostic);
752 }
753 }
754 Err(err) => {
755 diagnostics.push(Diagnostic::error(
756 "schema_apply_failed",
757 schema_address(graph_id),
758 format!("schema apply failed on '{graph_uri}': {err}"),
759 ));
760 if live_schema_matches_recorded_digest(
761 &graph_uri,
762 recorded_schema_digest.as_deref(),
763 observed_manifest_version,
764 )
765 .await
766 {
767 if let Err(err) = backend.try_delete_object(&sidecar_path).await {
772 diagnostics.push(Diagnostic::warning(
773 "recovery_sidecar_cleanup_failed",
774 sidecar_path.clone(),
775 format!(
776 "could not delete the stale recovery sidecar after a pre-movement \
777 schema-apply rejection; graph `{graph_id}` stays quarantined until \
778 a state-mutating cluster command sweeps it: {err}"
779 ),
780 ));
781 }
782 }
783 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
784 graph_moving_aborted = true;
785 continue;
786 }
787 }
788 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY) {
791 diagnostics.push(diagnostic);
792 return early_return(
793 display_path(&desired.config_dir),
794 Some(desired.config_digest),
795 observations,
796 changes,
797 state.resource_statuses,
798 diagnostics,
799 );
800 }
801 completed_op_sidecars.push(sidecar_path);
802 }
803
804 if !failed_graphs.is_empty() {
805 demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
806 }
807
808 for change in &changes {
809 match change.disposition {
810 Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning(
811 "apply_unsupported_change",
812 change.resource.clone(),
813 "graph/schema changes are not applied in this stage; they are deferred to the graph-lifecycle phase",
814 )),
815 Some(ApplyDisposition::Blocked) => diagnostics.push(Diagnostic::warning(
816 "apply_dependency_blocked",
817 change.resource.clone(),
818 format!(
819 "blocked by an unapplied or missing dependency ({})",
820 change.reason.as_deref().unwrap_or("dependency")
821 ),
822 )),
823 _ => {}
824 }
825 }
826
827 let errors_before_payloads = count_errors(&diagnostics);
832 for change in &changes {
833 if change.disposition != Some(ApplyDisposition::Applied)
834 || change.operation == PlanOperation::Delete
835 {
836 continue;
837 }
838 let kind = resource_kind(&change.resource);
839 let digest = change
840 .after_digest
841 .as_deref()
842 .expect("create/update always carries an after digest");
843 if ClusterStore::payload_relative(&kind, digest).is_none() {
844 continue;
845 }
846 let Some(source) = source_paths.get(change.resource.as_str()) else {
847 diagnostics.push(Diagnostic::error(
848 "resource_payload_write_error",
849 change.resource.clone(),
850 "no source file recorded for resource",
851 ));
852 continue;
853 };
854 if let Err(diagnostic) =
855 write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource)
856 .await
857 {
858 diagnostics.push(diagnostic);
859 }
860 }
861 if count_errors(&diagnostics) > errors_before_payloads {
862 return early_return(
863 display_path(&desired.config_dir),
864 Some(desired.config_digest),
865 observations,
866 changes,
867 state.resource_statuses,
868 diagnostics,
869 );
870 }
871
872 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE) {
876 diagnostics.push(diagnostic);
877 return early_return(
878 display_path(&desired.config_dir),
879 Some(desired.config_digest),
880 observations,
881 changes,
882 state.resource_statuses,
883 diagnostics,
884 );
885 }
886
887 let graph_deletes_to_run: Vec<String> = changes
890 .iter()
891 .filter(|change| {
892 change.disposition == Some(ApplyDisposition::Applied)
893 && change.operation == PlanOperation::Delete
894 && matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
895 })
896 .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
897 .collect();
898 let mut executed_deletes: Vec<(String, Option<String>)> = Vec::new(); let mut consumed_approval_ids: Vec<String> = Vec::new();
900 for graph_id in &graph_deletes_to_run {
901 if graph_moving_aborted {
902 diagnostics.push(Diagnostic::warning(
903 "graph_delete_skipped",
904 graph_address(graph_id),
905 "skipped after an earlier graph-moving operation failed in this run",
906 ));
907 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
908 continue;
909 }
910 let graph_addr = graph_address(graph_id);
911 let approval_id = approval_artifacts
913 .iter()
914 .map(|(_, artifact)| artifact)
915 .find(|artifact| {
916 artifact.consumed_at.is_none()
917 && artifact.resource == graph_addr
918 && artifact.bound_config_digest == desired.config_digest
919 })
920 .map(|artifact| artifact.approval_id.clone());
921 let graph_uri = backend.graph_root(graph_id);
922 let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await {
923 Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await {
924 Ok(snapshot) => Some(snapshot.version()),
925 Err(_) => None,
926 },
927 Err(_) => None, };
929 let sidecar = RecoverySidecar {
930 schema_version: 1,
931 operation_id: Ulid::new().to_string(),
932 started_at: now_rfc3339(),
933 actor: options.actor.clone(),
934 kind: RecoverySidecarKind::GraphDelete,
935 graph_id: graph_id.clone(),
936 graph_uri: graph_uri.clone(),
937 observed_manifest_version,
938 expected_manifest_version: None, desired_schema_digest: String::new(),
940 state_cas_base: expected_cas.clone(),
941 approval_id: approval_id.clone(),
942 };
943 let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
944 Ok(path) => path,
945 Err(diagnostic) => {
946 diagnostics.push(diagnostic);
947 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
948 graph_moving_aborted = true;
949 continue;
950 }
951 };
952 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE) {
953 diagnostics.push(diagnostic);
956 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
957 graph_moving_aborted = true;
958 continue;
959 }
960 match backend.delete_graph_root(&graph_uri).await {
963 Ok(()) => {}
964 Err(err) => {
965 diagnostics.push(Diagnostic::error(
966 "graph_delete_failed",
967 graph_addr.clone(),
968 format!("could not remove graph root '{graph_uri}': {err}"),
969 ));
970 failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
971 graph_moving_aborted = true;
972 continue;
973 }
974 }
975 if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE) {
978 diagnostics.push(diagnostic);
979 return early_return(
980 display_path(&desired.config_dir),
981 Some(desired.config_digest),
982 observations,
983 changes,
984 state.resource_statuses,
985 diagnostics,
986 );
987 }
988 executed_deletes.push((graph_id.clone(), approval_id.clone()));
989 if let Some(approval_id) = approval_id {
990 consumed_approval_ids.push(approval_id);
991 }
992 completed_op_sidecars.push(sidecar_path);
993 }
994 if !failed_graphs.is_empty() {
995 demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
996 }
997
998 let mut new_state = state.clone();
1002 for change in &changes {
1003 match change.disposition {
1004 Some(ApplyDisposition::Applied) => match change.operation {
1005 PlanOperation::Create | PlanOperation::Update => {
1006 new_state.applied_revision.resources.insert(
1007 change.resource.clone(),
1008 StateResource {
1009 digest: change
1010 .after_digest
1011 .clone()
1012 .expect("create/update always carries an after digest"),
1013 applies_to: desired.policy_bindings.get(&change.resource).cloned(),
1016 embedding_provider: None,
1017 embedding_profile: desired
1018 .embedding_providers
1019 .get(&change.resource)
1020 .cloned(),
1021 },
1022 );
1023 set_resource_status_applied(&mut new_state, &change.resource);
1024 }
1025 PlanOperation::Delete => {
1026 new_state
1027 .applied_revision
1028 .resources
1029 .remove(&change.resource);
1030 new_state.resource_statuses.remove(&change.resource);
1031 }
1032 },
1033 Some(ApplyDisposition::Blocked) => {
1034 if change.reason.as_deref() != Some("cluster_recovery_pending") {
1037 set_resource_status(
1038 &mut new_state,
1039 &change.resource,
1040 ResourceLifecycleStatus::Blocked,
1041 change.reason.as_deref().unwrap_or("dependency_not_applied"),
1042 "waiting on an unapplied or missing dependency",
1043 );
1044 }
1045 }
1046 _ => {}
1047 }
1048 }
1049 for (graph_id, approval_id) in &executed_deletes {
1050 tombstone_graph_subtree(
1051 &mut new_state,
1052 graph_id,
1053 approval_id.as_deref(),
1054 options.actor.as_deref(),
1055 );
1056 if let Some(approval_id) = approval_id {
1057 record_approval_consumed(&mut new_state, approval_id, "apply");
1058 }
1059 }
1060 recompute_state_graph_digests(&mut new_state, &desired);
1061
1062 let mut residual = diff_resources(
1063 &state_resource_digests(&new_state),
1064 &desired.resource_digests,
1065 );
1066 append_policy_binding_changes(&mut residual, Some(&new_state), &desired);
1067 append_embedding_profile_changes(&mut residual, Some(&new_state), &desired);
1068 let converged = residual.is_empty();
1069 if converged {
1070 new_state.applied_revision.config_digest = Some(desired.config_digest.clone());
1071 }
1072
1073 let after_value =
1074 serde_json::to_value(&new_state).expect("cluster state must serialize deterministically");
1075 let mut state_written = false;
1076 let mut state_write_failed = false;
1077 if after_value != before_value {
1078 new_state.state_revision = new_state.state_revision.saturating_add(1);
1079 let write_result = match failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE) {
1084 Ok(()) => {
1085 backend
1086 .write_state(&new_state, expected_cas.as_deref(), &mut observations)
1087 .await
1088 }
1089 Err(diagnostic) => Err(diagnostic),
1090 };
1091 match write_result {
1092 Ok(()) => state_written = true,
1093 Err(diagnostic) => {
1094 diagnostics.push(diagnostic);
1095 state_write_failed = true;
1096 }
1097 }
1098 }
1099 if !state_write_failed {
1102 for sidecar_uri in sweep
1103 .completed_sidecars
1104 .iter()
1105 .chain(completed_op_sidecars.iter())
1106 {
1107 backend.delete_object(sidecar_uri).await;
1108 }
1109 let mut all_consumed = sweep.consumed_approvals.clone();
1110 all_consumed.extend(consumed_approval_ids.iter().cloned());
1111 mark_approvals_consumed(&backend, &all_consumed).await;
1112 }
1113 let resource_statuses = if state_write_failed {
1118 state.resource_statuses
1119 } else {
1120 new_state.resource_statuses
1121 };
1122
1123 let applied_count = changes
1124 .iter()
1125 .filter(|change| change.disposition == Some(ApplyDisposition::Applied))
1126 .count();
1127 let deferred_count = changes
1128 .iter()
1129 .filter(|change| {
1130 matches!(
1131 change.disposition,
1132 Some(ApplyDisposition::Deferred) | Some(ApplyDisposition::Blocked)
1133 )
1134 })
1135 .count();
1136
1137 ApplyOutput {
1138 ok: !has_errors(&diagnostics),
1139 config_dir: display_path(&desired.config_dir),
1140 actor: options.actor.clone(),
1141 desired_revision: DesiredRevision {
1142 config_digest: Some(desired.config_digest),
1143 },
1144 state_observations: observations,
1145 changes,
1146 applied_count,
1147 deferred_count,
1148 converged,
1149 state_written,
1150 resource_statuses,
1151 diagnostics,
1152 }
1153}
1154
1155pub async fn approve_config_dir(
1161 config_dir: impl AsRef<Path>,
1162 resource: &str,
1163 approved_by: &str,
1164) -> ApproveOutput {
1165 let outcome = load_desired(config_dir.as_ref());
1166 let mut diagnostics = outcome.diagnostics;
1167 let storage_root = outcome
1168 .desired
1169 .as_ref()
1170 .and_then(|desired| desired.storage_root.clone());
1171 let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
1172 Ok(backend) => backend,
1173 Err(diagnostic) => {
1174 diagnostics.push(diagnostic);
1175 ClusterStore::for_config_dir(&outcome.config_dir)
1176 }
1177 };
1178 let mut observations = backend.observations();
1179
1180 let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
1181 ok: false,
1182 config_dir,
1183 approval_id: None,
1184 resource: None,
1185 operation: None,
1186 approved_by: None,
1187 diagnostics,
1188 };
1189
1190 let Some(desired) = outcome.desired else {
1191 return fail(display_path(&outcome.config_dir), diagnostics);
1192 };
1193 if has_errors(&diagnostics) {
1194 return fail(display_path(&desired.config_dir), diagnostics);
1195 }
1196
1197 let _lock_guard = if desired.state_lock {
1198 match backend.acquire_lock("approve", &mut observations).await {
1199 Ok(guard) => Some(guard),
1200 Err(diagnostic) => {
1201 diagnostics.push(diagnostic);
1202 return fail(display_path(&desired.config_dir), diagnostics);
1203 }
1204 }
1205 } else {
1206 diagnostics.push(Diagnostic::warning(
1207 "state_lock_disabled",
1208 "state.lock",
1209 "state.lock is false; approve ran without acquiring the cluster state lock",
1210 ));
1211 None
1212 };
1213
1214 let state = match backend.read_state(&mut observations).await {
1215 Ok(snapshot) => match snapshot.state {
1216 Some(state) => state,
1217 None => {
1218 diagnostics.push(Diagnostic::error(
1219 "state_missing",
1220 CLUSTER_STATE_FILE,
1221 "approve requires an existing state.json; run `cluster import` first",
1222 ));
1223 return fail(display_path(&desired.config_dir), diagnostics);
1224 }
1225 },
1226 Err(diagnostic) => {
1227 diagnostics.push(diagnostic);
1228 return fail(display_path(&desired.config_dir), diagnostics);
1229 }
1230 };
1231
1232 let prior_resources = state_resource_digests(&state);
1233 let changes = diff_resources(&prior_resources, &desired.resource_digests);
1234 let gates = compute_approvals(&changes, &BTreeSet::new());
1235 let Some(change) = changes.iter().find(|change| {
1236 change.resource == resource && gates.iter().any(|gate| gate.resource == resource)
1237 }) else {
1238 diagnostics.push(Diagnostic::error(
1239 "approval_not_required",
1240 resource,
1241 "no pending change for this resource requires approval (check `cluster plan`)",
1242 ));
1243 return fail(display_path(&desired.config_dir), diagnostics);
1244 };
1245
1246 let artifact = ApprovalArtifact {
1247 schema_version: 1,
1248 approval_id: Ulid::new().to_string(),
1249 resource: change.resource.clone(),
1250 operation: match change.operation {
1251 PlanOperation::Create => "create",
1252 PlanOperation::Update => "update",
1253 PlanOperation::Delete => "delete",
1254 }
1255 .to_string(),
1256 reason: gates
1257 .iter()
1258 .find(|gate| gate.resource == resource)
1259 .map(|gate| gate.reason.clone())
1260 .unwrap_or_default(),
1261 bound_config_digest: desired.config_digest.clone(),
1262 bound_before_digest: change.before_digest.clone(),
1263 bound_after_digest: change.after_digest.clone(),
1264 approved_by: approved_by.to_string(),
1265 created_at: now_rfc3339(),
1266 consumed_at: None,
1267 consumed_by_operation: None,
1268 };
1269 if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await {
1270 diagnostics.push(diagnostic);
1271 return fail(display_path(&desired.config_dir), diagnostics);
1272 }
1273
1274 ApproveOutput {
1275 ok: !has_errors(&diagnostics),
1276 config_dir: display_path(&desired.config_dir),
1277 approval_id: Some(artifact.approval_id),
1278 resource: Some(artifact.resource),
1279 operation: Some(change.operation.clone()),
1280 approved_by: Some(artifact.approved_by),
1281 diagnostics,
1282 }
1283}
1284
1285pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
1286 let parsed = parse_cluster_config(config_dir.as_ref());
1287 let mut diagnostics = parsed.diagnostics;
1288 let storage_root = parsed.raw.as_ref().and_then(|raw| {
1289 raw.storage
1290 .as_deref()
1291 .map(str::trim)
1292 .filter(|root| !root.is_empty())
1293 .map(|root| root.trim_end_matches('/').to_string())
1294 });
1295 let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
1296 Ok(backend) => backend,
1297 Err(diagnostic) => {
1298 diagnostics.push(diagnostic);
1299 ClusterStore::for_config_dir(&parsed.config_dir)
1300 }
1301 };
1302 let mut observations = backend.observations();
1303 backend
1304 .observe_lock(&mut observations, &mut diagnostics)
1305 .await;
1306 warn_pending_recovery_sidecars(&backend, &mut diagnostics).await;
1307
1308 let mut resource_digests = BTreeMap::new();
1309 let mut resource_statuses = BTreeMap::new();
1310 let mut state_observation_records = BTreeMap::new();
1311
1312 if let Some(raw) = parsed.raw.as_ref() {
1313 let _settings = validate_cluster_header(raw, &mut diagnostics);
1314 if !has_errors(&diagnostics) {
1315 match backend.read_state(&mut observations).await {
1316 Ok(snapshot) => {
1317 if let Some(state) = snapshot.state {
1318 for (address, finding) in verify_catalog_payloads(&backend, &state).await {
1322 diagnostics.push(payload_finding_diagnostic(&address, &finding));
1323 }
1324 resource_digests = state_resource_digests(&state);
1325 resource_statuses = state.resource_statuses;
1326 state_observation_records = state.observations;
1327 } else {
1328 diagnostics.push(Diagnostic::warning(
1329 "state_missing",
1330 CLUSTER_STATE_FILE,
1331 "state.json is missing; no applied cluster revision has been recorded",
1332 ));
1333 }
1334 }
1335 Err(diagnostic) => diagnostics.push(diagnostic),
1336 }
1337 }
1338 }
1339
1340 StatusOutput {
1341 ok: !has_errors(&diagnostics),
1342 config_dir: display_path(&parsed.config_dir),
1343 state_observations: observations,
1344 resource_digests,
1345 resource_statuses,
1346 observations: state_observation_records,
1347 diagnostics,
1348 }
1349}
1350
1351pub async fn force_unlock_config_dir(
1352 config_dir: impl AsRef<Path>,
1353 lock_id: impl AsRef<str>,
1354) -> ForceUnlockOutput {
1355 let parsed = parse_cluster_config(config_dir.as_ref());
1356 let mut diagnostics = parsed.diagnostics;
1357 let storage_root = parsed.raw.as_ref().and_then(|raw| {
1358 raw.storage
1359 .as_deref()
1360 .map(str::trim)
1361 .filter(|root| !root.is_empty())
1362 .map(|root| root.trim_end_matches('/').to_string())
1363 });
1364 let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
1365 Ok(backend) => backend,
1366 Err(diagnostic) => {
1367 diagnostics.push(diagnostic);
1368 ClusterStore::for_config_dir(&parsed.config_dir)
1369 }
1370 };
1371 let mut observations = backend.observations();
1372 let mut lock_removed = false;
1373
1374 if let Some(raw) = parsed.raw.as_ref() {
1375 let _settings = validate_cluster_header(raw, &mut diagnostics);
1376 if !has_errors(&diagnostics) {
1377 match backend
1378 .force_unlock(lock_id.as_ref(), &mut observations)
1379 .await
1380 {
1381 Ok(()) => lock_removed = true,
1382 Err(diagnostic) => diagnostics.push(diagnostic),
1383 }
1384 }
1385 }
1386
1387 ForceUnlockOutput {
1388 ok: !has_errors(&diagnostics),
1389 config_dir: display_path(&parsed.config_dir),
1390 state_observations: observations,
1391 lock_removed,
1392 diagnostics,
1393 }
1394}
1395
1396pub async fn refresh_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput {
1397 sync_config_dir(config_dir.as_ref(), StateSyncOperation::Refresh).await
1398}
1399
1400pub async fn import_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput {
1401 sync_config_dir(config_dir.as_ref(), StateSyncOperation::Import).await
1402}
1403
1404async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput {
1405 let outcome = load_desired(config_dir);
1406 let mut diagnostics = outcome.diagnostics;
1407 let storage_root = outcome
1408 .desired
1409 .as_ref()
1410 .and_then(|desired| desired.storage_root.clone());
1411 let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
1412 Ok(backend) => backend,
1413 Err(diagnostic) => {
1414 diagnostics.push(diagnostic);
1415 ClusterStore::for_config_dir(&outcome.config_dir)
1416 }
1417 };
1418 let mut observations = backend.observations();
1419
1420 let Some(desired) = outcome.desired else {
1421 return StateSyncOutput {
1422 ok: false,
1423 operation,
1424 config_dir: display_path(&outcome.config_dir),
1425 state_observations: observations,
1426 resource_digests: BTreeMap::new(),
1427 resource_statuses: BTreeMap::new(),
1428 observations: BTreeMap::new(),
1429 diagnostics,
1430 };
1431 };
1432
1433 if has_errors(&diagnostics) {
1434 return StateSyncOutput {
1435 ok: false,
1436 operation,
1437 config_dir: display_path(&desired.config_dir),
1438 state_observations: observations,
1439 resource_digests: desired.resource_digests,
1440 resource_statuses: BTreeMap::new(),
1441 observations: BTreeMap::new(),
1442 diagnostics,
1443 };
1444 }
1445
1446 let operation_label = state_sync_operation_label(operation);
1447 let _lock_guard = if desired.state_lock {
1448 match backend
1449 .acquire_lock(operation_label, &mut observations)
1450 .await
1451 {
1452 Ok(guard) => Some(guard),
1453 Err(diagnostic) => {
1454 diagnostics.push(diagnostic);
1455 None
1456 }
1457 }
1458 } else {
1459 diagnostics.push(Diagnostic::warning(
1460 "state_lock_disabled",
1461 "state.lock",
1462 format!(
1463 "state.lock is false; {operation_label} wrote state without acquiring the cluster state lock"
1464 ),
1465 ));
1466 None
1467 };
1468
1469 if has_errors(&diagnostics) {
1470 return StateSyncOutput {
1471 ok: false,
1472 operation,
1473 config_dir: display_path(&desired.config_dir),
1474 state_observations: observations,
1475 resource_digests: desired.resource_digests,
1476 resource_statuses: BTreeMap::new(),
1477 observations: BTreeMap::new(),
1478 diagnostics,
1479 };
1480 }
1481
1482 let snapshot = match backend.read_state(&mut observations).await {
1483 Ok(snapshot) => snapshot,
1484 Err(diagnostic) => {
1485 diagnostics.push(diagnostic);
1486 return StateSyncOutput {
1487 ok: false,
1488 operation,
1489 config_dir: display_path(&desired.config_dir),
1490 state_observations: observations,
1491 resource_digests: desired.resource_digests,
1492 resource_statuses: BTreeMap::new(),
1493 observations: BTreeMap::new(),
1494 diagnostics,
1495 };
1496 }
1497 };
1498
1499 let expected_cas = snapshot.state_cas;
1500 let mut state = match (operation, snapshot.state) {
1501 (StateSyncOperation::Refresh, Some(state)) => state,
1502 (StateSyncOperation::Refresh, None) => {
1503 diagnostics.push(Diagnostic::error(
1504 "state_missing",
1505 CLUSTER_STATE_FILE,
1506 "refresh requires an existing state.json; run `cluster import` to bootstrap state",
1507 ));
1508 return StateSyncOutput {
1509 ok: false,
1510 operation,
1511 config_dir: display_path(&desired.config_dir),
1512 state_observations: observations,
1513 resource_digests: BTreeMap::new(),
1514 resource_statuses: BTreeMap::new(),
1515 observations: BTreeMap::new(),
1516 diagnostics,
1517 };
1518 }
1519 (StateSyncOperation::Import, Some(state)) => {
1520 diagnostics.push(Diagnostic::error(
1521 "state_already_exists",
1522 CLUSTER_STATE_FILE,
1523 "import creates initial state only when state.json is missing; use `cluster refresh` for an existing state ledger",
1524 ));
1525 return StateSyncOutput {
1526 ok: false,
1527 operation,
1528 config_dir: display_path(&desired.config_dir),
1529 state_observations: observations,
1530 resource_digests: state_resource_digests(&state),
1531 resource_statuses: state.resource_statuses,
1532 observations: state.observations,
1533 diagnostics,
1534 };
1535 }
1536 (StateSyncOperation::Import, None) => initial_import_state(&desired),
1537 };
1538
1539 let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
1543
1544 for (address, finding) in verify_catalog_payloads(&backend, &state).await {
1549 diagnostics.push(payload_finding_diagnostic(&address, &finding));
1550 match finding {
1551 PayloadFinding::Missing => {
1552 state.applied_revision.resources.remove(&address);
1553 set_resource_status(
1554 &mut state,
1555 &address,
1556 ResourceLifecycleStatus::Drifted,
1557 "payload_missing",
1558 "catalog payload blob is missing; re-run `cluster apply` to republish",
1559 );
1560 }
1561 PayloadFinding::Mismatch { .. } => {
1562 state.applied_revision.resources.remove(&address);
1563 set_resource_status(
1564 &mut state,
1565 &address,
1566 ResourceLifecycleStatus::Drifted,
1567 "payload_mismatch",
1568 "catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish",
1569 );
1570 }
1571 PayloadFinding::ReadError(error) => {
1574 set_resource_status(
1575 &mut state,
1576 &address,
1577 ResourceLifecycleStatus::Error,
1578 "payload_read_error",
1579 &error,
1580 );
1581 }
1582 }
1583 }
1584
1585 let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await;
1586 if graph_error_count > 0 {
1587 diagnostics.push(Diagnostic::error(
1588 "graph_observation_error",
1589 CLUSTER_GRAPHS_DIR,
1590 format!("{graph_error_count} graph observation(s) failed"),
1591 ));
1592 }
1593
1594 if operation == StateSyncOperation::Import && has_errors(&diagnostics) {
1595 return StateSyncOutput {
1596 ok: false,
1597 operation,
1598 config_dir: display_path(&desired.config_dir),
1599 state_observations: observations,
1600 resource_digests: state_resource_digests(&state),
1601 resource_statuses: state.resource_statuses,
1602 observations: state.observations,
1603 diagnostics,
1604 };
1605 }
1606
1607 if operation == StateSyncOperation::Import {
1608 state.state_revision = 1;
1609 } else {
1610 state.state_revision = state.state_revision.saturating_add(1);
1611 }
1612
1613 match backend
1614 .write_state(&state, expected_cas.as_deref(), &mut observations)
1615 .await
1616 {
1617 Ok(()) => {
1618 for sidecar_uri in &sweep.completed_sidecars {
1621 backend.delete_object(sidecar_uri).await;
1622 }
1623 mark_approvals_consumed(&backend, &sweep.consumed_approvals).await;
1624 }
1625 Err(diagnostic) => diagnostics.push(diagnostic),
1626 }
1627
1628 let resource_digests = state_resource_digests(&state);
1629 let ok = !has_errors(&diagnostics);
1630
1631 StateSyncOutput {
1632 ok,
1633 operation,
1634 config_dir: display_path(&desired.config_dir),
1635 state_observations: observations,
1636 resource_digests,
1637 resource_statuses: state.resource_statuses,
1638 observations: state.observations,
1639 diagnostics,
1640 }
1641}
1642
1643#[derive(Debug, PartialEq, Eq)]
1644enum PayloadFinding {
1645 Missing,
1646 Mismatch { actual_digest: String },
1647 ReadError(String),
1648}
1649
1650async fn verify_catalog_payloads(
1656 backend: &ClusterStore,
1657 state: &ClusterState,
1658) -> Vec<(String, PayloadFinding)> {
1659 let mut findings = Vec::new();
1660 for (address, resource) in &state.applied_revision.resources {
1661 let kind = resource_kind(address);
1662 if ClusterStore::payload_relative(&kind, &resource.digest).is_none() {
1663 continue;
1664 }
1665 match backend.read_payload(&kind, &resource.digest).await {
1666 Ok(Some(text)) => {
1667 let actual_digest = sha256_hex(text.as_bytes());
1668 if actual_digest != resource.digest {
1669 findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
1670 }
1671 }
1672 Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)),
1673 Err(err) => {
1674 findings.push((address.clone(), PayloadFinding::ReadError(err)));
1675 }
1676 }
1677 }
1678 findings
1679}
1680
1681fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic {
1682 match finding {
1683 PayloadFinding::Missing => Diagnostic::warning(
1684 "catalog_payload_missing",
1685 address,
1686 "catalog payload blob is missing; re-run `cluster apply` to republish",
1687 ),
1688 PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning(
1689 "catalog_payload_mismatch",
1690 address,
1691 format!(
1692 "catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish"
1693 ),
1694 ),
1695 PayloadFinding::ReadError(error) => {
1697 Diagnostic::error("catalog_payload_read_error", address, error.clone())
1698 }
1699 }
1700}
1701
1702async fn write_resource_payload(
1707 backend: &ClusterStore,
1708 kind: &ResourceKind,
1709 source: &Path,
1710 expected_digest: &str,
1711 resource: &str,
1712) -> Result<(), Diagnostic> {
1713 if backend.payload_exists(kind, expected_digest).await {
1714 return Ok(());
1716 }
1717 let bytes = fs::read(source).map_err(|err| {
1718 Diagnostic::error(
1719 "resource_payload_write_error",
1720 resource,
1721 format!(
1722 "could not read resource source '{}': {err}",
1723 source.display()
1724 ),
1725 )
1726 })?;
1727 if sha256_hex(&bytes) != expected_digest {
1728 return Err(Diagnostic::error(
1732 "resource_content_changed",
1733 resource,
1734 format!(
1735 "resource source '{}' changed while apply was running; re-run `cluster apply`",
1736 source.display()
1737 ),
1738 ));
1739 }
1740 let content = String::from_utf8(bytes).map_err(|err| {
1741 Diagnostic::error(
1742 "resource_payload_write_error",
1743 resource,
1744 format!("resource source is not valid UTF-8: {err}"),
1745 )
1746 })?;
1747 backend
1748 .write_payload(kind, expected_digest, &content)
1749 .await
1750 .map_err(|err| {
1751 Diagnostic::error(
1752 "resource_payload_write_error",
1753 resource,
1754 format!("could not write payload: {err}"),
1755 )
1756 })
1757}
1758
1759fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) {
1764 for graph in &desired.graphs {
1765 let graph_address = graph_address(&graph.id);
1766 if !state
1767 .applied_revision
1768 .resources
1769 .contains_key(&graph_address)
1770 {
1771 continue;
1772 }
1773 let schema_digest = state
1774 .applied_revision
1775 .resources
1776 .get(&schema_address(&graph.id))
1777 .map(|resource| resource.digest.clone());
1778 let query_digests = state_query_digests_for_graph(state, &graph.id);
1779 let embedding_provider = graph.embedding_provider.as_deref();
1780 let embedding_provider_digest = embedding_provider
1781 .and_then(|address| state.applied_revision.resources.get(address))
1782 .map(|resource| resource.digest.clone());
1783 let digest = graph_digest(
1784 &graph.id,
1785 schema_digest.as_ref(),
1786 Some(&query_digests),
1787 embedding_provider,
1788 embedding_provider_digest.as_ref(),
1789 );
1790 state.applied_revision.resources.insert(
1791 graph_address,
1792 StateResource {
1793 digest,
1794 applies_to: None,
1795 embedding_provider: graph.embedding_provider.clone(),
1796 embedding_profile: None,
1797 },
1798 );
1799 }
1800}
1801
1802fn duplicate_key_diagnostics(text: &str) -> Vec<Diagnostic> {
1803 #[derive(Debug)]
1804 struct Frame {
1805 indent: isize,
1806 path: String,
1807 keys: BTreeSet<String>,
1808 }
1809
1810 let mut diagnostics = Vec::new();
1811 let mut stack = vec![Frame {
1812 indent: -1,
1813 path: String::new(),
1814 keys: BTreeSet::new(),
1815 }];
1816
1817 for (line_idx, line) in text.lines().enumerate() {
1818 let line_without_comment = strip_comment(line);
1819 if line_without_comment.trim().is_empty() {
1820 continue;
1821 }
1822 let indent = line_without_comment
1823 .chars()
1824 .take_while(|ch| *ch == ' ')
1825 .count() as isize;
1826 let trimmed = line_without_comment.trim_start();
1827 if trimmed.starts_with('-') {
1828 continue;
1829 }
1830 let Some((raw_key, raw_value)) = trimmed.split_once(':') else {
1831 continue;
1832 };
1833 let key = raw_key.trim();
1834 if key.is_empty() || key.starts_with('{') || key.starts_with('[') {
1835 continue;
1836 }
1837
1838 while stack.last().is_some_and(|frame| indent <= frame.indent) {
1839 stack.pop();
1840 }
1841 let parent = stack.last_mut().expect("root frame is always present");
1842 let full_path = if parent.path.is_empty() {
1843 key.to_string()
1844 } else {
1845 format!("{}.{}", parent.path, key)
1846 };
1847 if !parent.keys.insert(key.to_string()) {
1848 diagnostics.push(Diagnostic::error(
1849 "duplicate_yaml_key",
1850 full_path.clone(),
1851 format!("duplicate YAML key `{key}` on line {}", line_idx + 1),
1852 ));
1853 }
1854 if raw_value.trim().is_empty() {
1855 stack.push(Frame {
1856 indent,
1857 path: full_path,
1858 keys: BTreeSet::new(),
1859 });
1860 }
1861 }
1862
1863 diagnostics
1864}
1865
1866fn strip_comment(line: &str) -> String {
1867 let mut in_single_quote = false;
1868 let mut in_double_quote = false;
1869 let mut escaped = false;
1870
1871 for (idx, ch) in line.char_indices() {
1872 if escaped {
1873 escaped = false;
1874 continue;
1875 }
1876 match ch {
1877 '\\' if in_double_quote => escaped = true,
1878 '\'' if !in_double_quote => in_single_quote = !in_single_quote,
1879 '"' if !in_single_quote => in_double_quote = !in_double_quote,
1880 '#' if !in_single_quote && !in_double_quote => return line[..idx].to_string(),
1881 _ => {}
1882 }
1883 }
1884
1885 line.to_string()
1886}
1887
1888fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap<String, String> {
1889 let prefix = format!("query.{graph_id}.");
1890 state
1891 .applied_revision
1892 .resources
1893 .iter()
1894 .filter_map(|(address, resource)| {
1895 address
1896 .strip_prefix(&prefix)
1897 .map(|name| (name.to_string(), resource.digest.clone()))
1898 })
1899 .collect()
1900}
1901
1902fn state_graph_embedding_provider(state: &ClusterState, graph_id: &str) -> Option<String> {
1903 state
1904 .applied_revision
1905 .resources
1906 .get(&graph_address(graph_id))
1907 .and_then(|resource| resource.embedding_provider.clone())
1908}
1909
1910fn state_embedding_provider_digest(
1911 state: &ClusterState,
1912 embedding_provider: Option<&str>,
1913) -> Option<String> {
1914 embedding_provider
1915 .and_then(|address| state.applied_revision.resources.get(address))
1916 .map(|resource| resource.digest.clone())
1917}
1918
1919fn set_resource_status_applied(state: &mut ClusterState, address: &str) {
1920 state.resource_statuses.insert(
1921 address.to_string(),
1922 ResourceStatusRecord {
1923 status: ResourceLifecycleStatus::Applied,
1924 conditions: Vec::new(),
1925 message: None,
1926 },
1927 );
1928}
1929
1930fn set_resource_status(
1931 state: &mut ClusterState,
1932 address: &str,
1933 status: ResourceLifecycleStatus,
1934 condition: &str,
1935 message: &str,
1936) {
1937 state.resource_statuses.insert(
1938 address.to_string(),
1939 ResourceStatusRecord {
1940 status,
1941 conditions: vec![condition.to_string()],
1942 message: Some(message.to_string()),
1943 },
1944 );
1945}
1946
1947fn graph_digest(
1948 graph_id: &str,
1949 schema_digest: Option<&String>,
1950 query_digests: Option<&BTreeMap<String, String>>,
1951 embedding_provider: Option<&str>,
1952 embedding_provider_digest: Option<&String>,
1953) -> String {
1954 let mut input = format!(
1955 "graph\0{graph_id}\0schema\0{}\0",
1956 schema_digest.map_or("", String::as_str)
1957 );
1958 if let Some(query_digests) = query_digests {
1959 for (name, digest) in query_digests {
1960 input.push_str("query\0");
1961 input.push_str(name);
1962 input.push('\0');
1963 input.push_str(digest);
1964 input.push('\0');
1965 }
1966 }
1967 if let Some(provider) = embedding_provider {
1968 input.push_str("embedding_provider\0");
1969 input.push_str(provider);
1970 input.push('\0');
1971 input.push_str(embedding_provider_digest.map_or("", String::as_str));
1972 input.push('\0');
1973 }
1974 sha256_hex(input.as_bytes())
1975}
1976
1977fn embedding_provider_digest(profile: &EmbeddingProviderConfig) -> String {
1978 let mut input = String::from("embedding-provider\0");
1979 let config_semantics =
1980 serde_json::to_string(profile).expect("embedding provider config must serialize");
1981 input.push_str(&config_semantics);
1982 sha256_hex(input.as_bytes())
1983}
1984
1985async fn live_schema_matches_recorded_digest(
1986 graph_uri: &str,
1987 recorded_schema_digest: Option<&str>,
1988 observed_manifest_version: Option<u64>,
1989) -> bool {
1990 let Some(recorded_schema_digest) = recorded_schema_digest else {
1991 return false;
1992 };
1993 let Some(observed_manifest_version) = observed_manifest_version else {
1994 return false;
1995 };
1996 let Ok(db) = Omnigraph::open_read_only(graph_uri).await else {
1997 return false;
1998 };
1999 let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await else {
2000 return false;
2001 };
2002 if snapshot.version() != observed_manifest_version {
2003 return false;
2004 }
2005 sha256_hex(db.schema_source().as_bytes()) == recorded_schema_digest
2006}
2007
2008fn desired_config_digest(
2009 raw: &RawClusterConfig,
2010 resource_digests: &BTreeMap<String, String>,
2011) -> String {
2012 let mut input = String::from("cluster-config\0");
2013 let config_semantics =
2016 serde_json::to_string(raw).expect("raw cluster config must serialize deterministically");
2017 input.push_str(&config_semantics);
2018 input.push('\0');
2019 for (address, digest) in resource_digests {
2020 input.push_str(address);
2021 input.push('\0');
2022 input.push_str(digest);
2023 input.push('\0');
2024 }
2025 sha256_hex(input.as_bytes())
2026}
2027
2028fn sha256_hex(bytes: &[u8]) -> String {
2029 let digest = Sha256::digest(bytes);
2030 const HEX: &[u8; 16] = b"0123456789abcdef";
2031 let mut out = String::with_capacity(digest.len() * 2);
2032 for byte in digest {
2033 out.push(HEX[(byte >> 4) as usize] as char);
2034 out.push(HEX[(byte & 0x0f) as usize] as char);
2035 }
2036 out
2037}
2038
2039fn now_rfc3339() -> String {
2040 OffsetDateTime::now_utc()
2041 .format(&Rfc3339)
2042 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2043}
2044
2045fn lock_age_seconds(created_at: &str) -> Option<u64> {
2046 let created_at = OffsetDateTime::parse(created_at, &Rfc3339).ok()?;
2047 Some(
2048 (OffsetDateTime::now_utc() - created_at)
2049 .whole_seconds()
2050 .max(0) as u64,
2051 )
2052}
2053
2054fn state_sync_operation_label(operation: StateSyncOperation) -> &'static str {
2055 match operation {
2056 StateSyncOperation::Refresh => "refresh",
2057 StateSyncOperation::Import => "import",
2058 }
2059}
2060
2061fn has_errors(diagnostics: &[Diagnostic]) -> bool {
2062 diagnostics
2063 .iter()
2064 .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
2065}
2066
2067fn count_errors(diagnostics: &[Diagnostic]) -> usize {
2068 diagnostics
2069 .iter()
2070 .filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
2071 .count()
2072}
2073
2074fn display_path(path: &Path) -> String {
2075 path.display().to_string()
2076}
2077
2078#[cfg(test)]
2079#[path = "tests.rs"]
2080mod tests;