Skip to main content

omnigraph_cluster/
lib.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fs::{self, OpenOptions};
3use std::io::{ErrorKind, Write};
4use std::path::{Path, PathBuf};
5use std::process;
6
7use omnigraph::db::{Omnigraph, ReadTarget, SchemaApplyOptions};
8use omnigraph_compiler::SchemaMigrationPlan;
9use omnigraph_compiler::build_catalog;
10use omnigraph_compiler::query::parser::parse_query;
11use omnigraph_compiler::query::typecheck::typecheck_query_decl;
12use omnigraph_compiler::schema::parser::parse_schema;
13use serde::{Deserialize, Serialize};
14use serde_json::json;
15use sha2::{Digest, Sha256};
16use time::OffsetDateTime;
17use time::format_description::well_known::Rfc3339;
18use ulid::Ulid;
19
20pub mod failpoints;
21
22mod config;
23mod diff;
24mod serve;
25mod store;
26mod sweep;
27mod types;
28use config::{
29    QueriesDecl, future_field_diagnostics, graph_address, initial_import_state, load_desired,
30    normalize_policy_target, observe_declared_graphs, observe_live_graph, parse_cluster_config,
31    policy_address, preview_schema_migration, query_address, resolve_config_path,
32    resolve_query_decls, schema_address, state_resource_digests, validate_cluster_header,
33    validate_id, validate_query_source,
34};
35use diff::{
36    FailedGraphOrigin, ResourceKind, append_embedding_profile_changes,
37    append_policy_binding_changes, approved_resources, classify_changes, compute_approvals,
38    compute_blast_radius, demote_dependents_of_failed_graphs, diff_resources, resource_kind,
39};
40pub use serve::{
41    ServingGraph, ServingPolicy, ServingQuery, ServingSnapshot, cluster_graph_ids,
42    cluster_root_for_graph_uri, read_serving_snapshot, read_serving_snapshot_from_storage,
43    resolve_graph_storage_uri,
44};
45use store::{ClusterStore, StateLockGuard, StateSnapshot};
46use sweep::{
47    mark_approvals_consumed, record_approval_consumed, sweep_recovery_sidecars,
48    tombstone_graph_subtree, warn_pending_recovery_sidecars,
49};
50pub use types::*;
51use types::*;
52
53pub const CLUSTER_CONFIG_FILE: &str = "cluster.yaml";
54pub const CLUSTER_GRAPHS_DIR: &str = "graphs";
55pub const CLUSTER_STATE_DIR: &str = "__cluster";
56pub const CLUSTER_STATE_FILE: &str = "__cluster/state.json";
57pub const CLUSTER_LOCK_FILE: &str = "__cluster/lock.json";
58pub const CLUSTER_RESOURCES_DIR: &str = "__cluster/resources";
59pub const CLUSTER_RECOVERIES_DIR: &str = "__cluster/recoveries";
60pub const CLUSTER_APPROVALS_DIR: &str = "__cluster/approvals";
61
62/// The store for a load outcome: the declared `storage:` root when present,
63/// the config directory itself otherwise. A bad root is a loud error.
64fn store_for(config_dir: &Path, storage_root: Option<&str>) -> Result<ClusterStore, Diagnostic> {
65    match storage_root {
66        Some(root) => ClusterStore::for_storage_root(root),
67        None => Ok(ClusterStore::for_config_dir(config_dir)),
68    }
69}
70
71pub fn validate_config_dir(config_dir: impl AsRef<Path>) -> ValidateOutput {
72    let outcome = load_desired(config_dir.as_ref());
73    let (resource_digests, resources, dependencies) = match outcome.desired {
74        Some(desired) => (
75            desired.resource_digests,
76            desired.resources,
77            desired.dependencies,
78        ),
79        None => (BTreeMap::new(), Vec::new(), Vec::new()),
80    };
81    let ok = !has_errors(&outcome.diagnostics);
82
83    ValidateOutput {
84        ok,
85        config_dir: display_path(&outcome.config_dir),
86        config_file: display_path(&outcome.config_file),
87        resource_digests,
88        resources,
89        dependencies,
90        diagnostics: outcome.diagnostics,
91    }
92}
93
94pub async fn plan_config_dir(config_dir: impl AsRef<Path>) -> PlanOutput {
95    let outcome = load_desired(config_dir.as_ref());
96    let mut diagnostics = outcome.diagnostics;
97    let storage_root = outcome
98        .desired
99        .as_ref()
100        .and_then(|desired| desired.storage_root.clone());
101    let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
102        Ok(backend) => backend,
103        Err(diagnostic) => {
104            diagnostics.push(diagnostic);
105            ClusterStore::for_config_dir(&outcome.config_dir)
106        }
107    };
108    let mut observations = backend.observations();
109
110    let Some(desired) = outcome.desired else {
111        return PlanOutput {
112            ok: false,
113            config_dir: display_path(&outcome.config_dir),
114            desired_revision: DesiredRevision {
115                config_digest: None,
116            },
117            resource_digests: BTreeMap::new(),
118            dependencies: Vec::new(),
119            state_observations: observations,
120            changes: Vec::new(),
121            blast_radius: Vec::new(),
122            approvals_required: Vec::new(),
123            diagnostics,
124        };
125    };
126
127    if has_errors(&diagnostics) {
128        return PlanOutput {
129            ok: false,
130            config_dir: display_path(&desired.config_dir),
131            desired_revision: DesiredRevision {
132                config_digest: Some(desired.config_digest),
133            },
134            resource_digests: desired.resource_digests,
135            dependencies: desired.dependencies,
136            state_observations: observations,
137            changes: Vec::new(),
138            blast_radius: Vec::new(),
139            approvals_required: Vec::new(),
140            diagnostics,
141        };
142    }
143
144    let _lock_guard = if desired.state_lock {
145        match backend.acquire_lock("plan", &mut observations).await {
146            Ok(guard) => Some(guard),
147            Err(diagnostic) => {
148                diagnostics.push(diagnostic);
149                None
150            }
151        }
152    } else {
153        diagnostics.push(Diagnostic::warning(
154            "state_lock_disabled",
155            "state.lock",
156            "state.lock is false; plan read state without acquiring the cluster state lock",
157        ));
158        None
159    };
160
161    // Plan is read-only: pending sidecars are reported, never acted on
162    // (RFC-004 open question 3 keeps read-only commands warn-only).
163    warn_pending_recovery_sidecars(&backend, &mut diagnostics).await;
164
165    let mut prior_resources = BTreeMap::new();
166    let mut prior_state: Option<ClusterState> = None;
167    if !has_errors(&diagnostics) {
168        match backend.read_state(&mut observations).await {
169            Ok(snapshot) => {
170                if let Some(state) = snapshot.state {
171                    prior_resources = state_resource_digests(&state);
172                    prior_state = Some(state);
173                }
174            }
175            Err(diagnostic) => diagnostics.push(diagnostic),
176        }
177    }
178
179    let mut changes = if has_errors(&diagnostics) {
180        Vec::new()
181    } else {
182        diff_resources(&prior_resources, &desired.resource_digests)
183    };
184    if !has_errors(&diagnostics) {
185        append_policy_binding_changes(&mut changes, prior_state.as_ref(), &desired);
186        append_embedding_profile_changes(&mut changes, prior_state.as_ref(), &desired);
187    }
188    // Plan previews dispositions without sweeping; a pending recovery is
189    // surfaced as the cluster_recovery_pending warning above instead.
190    let artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
191    let approved = approved_resources(
192        &artifacts,
193        &changes,
194        &desired.config_digest,
195        &mut diagnostics,
196    );
197    classify_changes(
198        &mut changes,
199        &desired.dependencies,
200        &BTreeSet::new(),
201        &approved,
202    );
203
204    // Embed real migration steps for schema updates so plan is a data-aware
205    // preview; failures degrade to the digest diff with a warning.
206    for change in &mut changes {
207        if change.operation != PlanOperation::Update {
208            continue;
209        }
210        let ResourceKind::Schema(graph_id) = resource_kind(&change.resource) else {
211            continue;
212        };
213        let graph_uri = backend.graph_root(&graph_id);
214        let source_path = desired
215            .resources
216            .iter()
217            .find(|resource| resource.address == change.resource)
218            .and_then(|resource| resource.path.clone());
219        let preview = match source_path {
220            Some(path) => preview_schema_migration(&graph_uri, &path).await,
221            None => Err("no schema source recorded".to_string()),
222        };
223        match preview {
224            Ok(migration) => change.migration = Some(migration),
225            Err(err) => diagnostics.push(Diagnostic::warning(
226                "schema_preview_unavailable",
227                change.resource.clone(),
228                format!("could not preview the schema migration: {err}"),
229            )),
230        }
231    }
232    let blast_radius = compute_blast_radius(&changes, &desired.dependencies);
233    let approvals_required = compute_approvals(&changes, &approved);
234    let ok = !has_errors(&diagnostics);
235
236    PlanOutput {
237        ok,
238        config_dir: display_path(&desired.config_dir),
239        desired_revision: DesiredRevision {
240            config_digest: Some(desired.config_digest),
241        },
242        resource_digests: desired.resource_digests,
243        dependencies: desired.dependencies,
244        state_observations: observations,
245        changes,
246        blast_radius,
247        approvals_required,
248        diagnostics,
249    }
250}
251
252/// Config-only `cluster apply` (Stage 3A): execute the query/policy subset of
253/// the plan against the local cluster catalog. The plan is recomputed under
254/// the state lock, so freshness is structural; the state CAS inside
255/// `write_state` is the second fence. Graph/schema changes are never executed
256/// here — they are deferred to the graph-lifecycle phase and reported loudly.
257///
258/// Payloads are content-addressed and written BEFORE the state CAS because
259/// state is the publish point: a failure after payload writes leaves inert
260/// digest-named blobs and no success acknowledgement; re-running apply is the
261/// repair.
262/// Options for `cluster apply`. `actor` attributes graph-moving operations
263/// (recorded in sidecars and audit entries, threaded to the engine's
264/// `apply_schema_as` so Cedar enforcement fires wherever a policy checker is
265/// installed).
266#[derive(Debug, Clone, Default)]
267pub struct ApplyOptions {
268    pub actor: Option<String>,
269}
270
271pub async fn apply_config_dir(config_dir: impl AsRef<Path>) -> ApplyOutput {
272    apply_config_dir_with_options(config_dir, ApplyOptions::default()).await
273}
274
275pub async fn apply_config_dir_with_options(
276    config_dir: impl AsRef<Path>,
277    options: ApplyOptions,
278) -> ApplyOutput {
279    let outcome = load_desired(config_dir.as_ref());
280    let mut diagnostics = outcome.diagnostics;
281    let storage_root = outcome
282        .desired
283        .as_ref()
284        .and_then(|desired| desired.storage_root.clone());
285    let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
286        Ok(backend) => backend,
287        Err(diagnostic) => {
288            diagnostics.push(diagnostic);
289            ClusterStore::for_config_dir(&outcome.config_dir)
290        }
291    };
292    let mut observations = backend.observations();
293
294    let actor_for_output = options.actor.clone();
295    let early_return = |config_dir: String,
296                        config_digest: Option<String>,
297                        observations: StateObservations,
298                        changes: Vec<PlanChange>,
299                        resource_statuses: BTreeMap<String, ResourceStatusRecord>,
300                        diagnostics: Vec<Diagnostic>| {
301        ApplyOutput {
302            ok: !has_errors(&diagnostics),
303            config_dir,
304            actor: actor_for_output.clone(),
305            desired_revision: DesiredRevision { config_digest },
306            state_observations: observations,
307            changes,
308            applied_count: 0,
309            deferred_count: 0,
310            converged: false,
311            state_written: false,
312            resource_statuses,
313            diagnostics,
314        }
315    };
316
317    let Some(desired) = outcome.desired else {
318        return early_return(
319            display_path(&outcome.config_dir),
320            None,
321            observations,
322            Vec::new(),
323            BTreeMap::new(),
324            diagnostics,
325        );
326    };
327
328    if has_errors(&diagnostics) {
329        return early_return(
330            display_path(&desired.config_dir),
331            Some(desired.config_digest),
332            observations,
333            Vec::new(),
334            BTreeMap::new(),
335            diagnostics,
336        );
337    }
338
339    // Named guard: the lock must be held until the state outcome is recorded.
340    let _lock_guard = if desired.state_lock {
341        match backend.acquire_lock("apply", &mut observations).await {
342            Ok(guard) => Some(guard),
343            Err(diagnostic) => {
344                diagnostics.push(diagnostic);
345                None
346            }
347        }
348    } else {
349        diagnostics.push(Diagnostic::warning(
350            "state_lock_disabled",
351            "state.lock",
352            "state.lock is false; apply wrote state without acquiring the cluster state lock",
353        ));
354        None
355    };
356
357    if has_errors(&diagnostics) {
358        return early_return(
359            display_path(&desired.config_dir),
360            Some(desired.config_digest),
361            observations,
362            Vec::new(),
363            BTreeMap::new(),
364            diagnostics,
365        );
366    }
367
368    let snapshot = match backend.read_state(&mut observations).await {
369        Ok(snapshot) => snapshot,
370        Err(diagnostic) => {
371            diagnostics.push(diagnostic);
372            return early_return(
373                display_path(&desired.config_dir),
374                Some(desired.config_digest),
375                observations,
376                Vec::new(),
377                BTreeMap::new(),
378                diagnostics,
379            );
380        }
381    };
382    let expected_cas = snapshot.state_cas;
383    let Some(mut state) = snapshot.state else {
384        diagnostics.push(Diagnostic::error(
385            "state_missing",
386            CLUSTER_STATE_FILE,
387            "apply requires an existing state.json; run `cluster import` to bootstrap state",
388        ));
389        return early_return(
390            display_path(&desired.config_dir),
391            Some(desired.config_digest),
392            observations,
393            Vec::new(),
394            BTreeMap::new(),
395            diagnostics,
396        );
397    };
398
399    // Snapshot the as-read state BEFORE the sweep so sweep mutations count as
400    // changes for the final dirty check and get persisted by the state CAS.
401    let before_value =
402        serde_json::to_value(&state).expect("cluster state must serialize deterministically");
403    let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
404
405    let prior_resources = state_resource_digests(&state);
406    let mut changes = diff_resources(&prior_resources, &desired.resource_digests);
407    append_policy_binding_changes(&mut changes, Some(&state), &desired);
408    append_embedding_profile_changes(&mut changes, Some(&state), &desired);
409    let approval_artifacts = backend.list_approval_artifacts(&mut diagnostics).await;
410    let approved = approved_resources(
411        &approval_artifacts,
412        &changes,
413        &desired.config_digest,
414        &mut diagnostics,
415    );
416    classify_changes(
417        &mut changes,
418        &desired.dependencies,
419        &sweep.pending_graphs,
420        &approved,
421    );
422
423    // Defensive invariant: nothing the approval gate covers may be executable
424    // WITHOUT a matching approval. Gated changes with a valid artifact are the
425    // sanctioned exception (stage 4C).
426    let approvals = compute_approvals(&changes, &approved);
427    let approval_violation = changes.iter().any(|change| {
428        change.disposition == Some(ApplyDisposition::Applied)
429            && approvals
430                .iter()
431                .any(|approval| approval.resource == change.resource && !approval.satisfied)
432    });
433    if approval_violation {
434        diagnostics.push(Diagnostic::error(
435            "apply_approval_invariant_violation",
436            "changes",
437            "an executable change requires approval; refusing to apply",
438        ));
439        return early_return(
440            display_path(&desired.config_dir),
441            Some(desired.config_digest),
442            observations,
443            changes,
444            state.resource_statuses,
445            diagnostics,
446        );
447    }
448
449    // Graph creates execute first (RFC-004 §D5), sequentially, sidecar-fenced:
450    // sidecar written before the init, rewritten with the post-init manifest
451    // version, deleted only after the final state CAS lands. A failure stops
452    // further graph-moving work and demotes that graph's dependents.
453    let source_paths: BTreeMap<&str, &str> = desired
454        .resources
455        .iter()
456        .filter_map(|resource| {
457            resource
458                .path
459                .as_deref()
460                .map(|path| (resource.address.as_str(), path))
461        })
462        .collect();
463    let graph_creates_to_run: Vec<String> = changes
464        .iter()
465        .filter(|change| {
466            change.disposition == Some(ApplyDisposition::Applied)
467                && change.operation == PlanOperation::Create
468                && matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
469        })
470        .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
471        .collect();
472    let mut completed_op_sidecars: Vec<String> = Vec::new();
473    let mut failed_graphs: BTreeMap<String, FailedGraphOrigin> = BTreeMap::new();
474    let mut graph_moving_aborted = false;
475    for graph_id in &graph_creates_to_run {
476        if graph_moving_aborted {
477            // A prior create failed: stop graph-moving work (loud partials).
478            diagnostics.push(Diagnostic::warning(
479                "graph_create_skipped",
480                graph_address(graph_id),
481                "skipped after an earlier graph create failed in this run",
482            ));
483            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
484            continue;
485        }
486        let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
487            continue;
488        };
489        let graph_uri = backend.graph_root(graph_id);
490        let mut sidecar = RecoverySidecar {
491            schema_version: 1,
492            operation_id: Ulid::new().to_string(),
493            started_at: now_rfc3339(),
494            actor: options.actor.clone(),
495            kind: RecoverySidecarKind::GraphCreate,
496            graph_id: graph_id.clone(),
497            graph_uri: graph_uri.clone(),
498            observed_manifest_version: None,
499            expected_manifest_version: None,
500            desired_schema_digest: desired_graph.schema_digest.clone(),
501            state_cas_base: expected_cas.clone(),
502            approval_id: None,
503        };
504        let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
505            Ok(path) => path,
506            Err(diagnostic) => {
507                diagnostics.push(diagnostic);
508                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
509                graph_moving_aborted = true;
510                continue;
511            }
512        };
513        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_CREATE) {
514            // Simulated crash before the init: the sidecar stays for the
515            // sweep (row 1: root absent -> intent removed next run).
516            diagnostics.push(diagnostic);
517            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
518            graph_moving_aborted = true;
519            continue;
520        }
521        // Re-read + re-verify the schema source under the lock — the same
522        // TOCTOU posture as write_resource_payload.
523        let schema_source = source_paths
524            .get(schema_address(graph_id).as_str())
525            .ok_or_else(|| {
526                Diagnostic::error(
527                    "graph_create_failed",
528                    graph_address(graph_id),
529                    "no schema source recorded for graph",
530                )
531            })
532            .and_then(|path| {
533                fs::read_to_string(Path::new(path)).map_err(|err| {
534                    Diagnostic::error(
535                        "graph_create_failed",
536                        graph_address(graph_id),
537                        format!("could not read schema source '{path}': {err}"),
538                    )
539                })
540            })
541            .and_then(|source| {
542                if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
543                    Ok(source)
544                } else {
545                    Err(Diagnostic::error(
546                        "resource_content_changed",
547                        schema_address(graph_id),
548                        "schema source changed while apply was running; re-run `cluster apply`",
549                    ))
550                }
551            });
552        let schema_source = match schema_source {
553            Ok(source) => source,
554            Err(diagnostic) => {
555                diagnostics.push(diagnostic);
556                backend.delete_object(&sidecar_path).await; // nothing moved
557                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
558                graph_moving_aborted = true;
559                continue;
560            }
561        };
562        match Omnigraph::init(&graph_uri, &schema_source).await {
563            Ok(_) => {}
564            Err(err) => {
565                diagnostics.push(Diagnostic::error(
566                    "graph_create_failed",
567                    graph_address(graph_id),
568                    format!("could not initialize graph at '{graph_uri}': {err}"),
569                ));
570                // The sidecar stays: the sweep classifies whether the failed
571                // init left a partial root (row 5) or nothing (row 1).
572                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphCreate);
573                graph_moving_aborted = true;
574                continue;
575            }
576        }
577        // Record the post-init pin in the sidecar (best effort — a failure
578        // here leaves expected = null and the sweep classifies by digest).
579        if let Ok(db) = Omnigraph::open_read_only(&graph_uri).await {
580            if let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await {
581                sidecar.expected_manifest_version = Some(snapshot.version());
582                if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
583                    diagnostics.push(diagnostic);
584                }
585            }
586        }
587        // Crash point: the graph exists, the cluster state does not record it
588        // yet. A failure here must acknowledge nothing; the next run's sweep
589        // rolls the ledger forward (row 4).
590        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_CREATE) {
591            diagnostics.push(diagnostic);
592            return early_return(
593                display_path(&desired.config_dir),
594                Some(desired.config_digest),
595                observations,
596                changes,
597                state.resource_statuses,
598                diagnostics,
599            );
600        }
601        completed_op_sidecars.push(sidecar_path);
602    }
603
604    // Schema applies execute next (RFC-004 §D5): the first cluster operation
605    // that moves an EXISTING graph manifest, sidecar-fenced the same way.
606    let schema_updates_to_run: Vec<String> = changes
607        .iter()
608        .filter(|change| {
609            change.disposition == Some(ApplyDisposition::Applied)
610                && change.operation == PlanOperation::Update
611                && matches!(resource_kind(&change.resource), ResourceKind::Schema(_))
612        })
613        .filter_map(|change| change.resource.strip_prefix("schema.").map(str::to_string))
614        .collect();
615    for graph_id in &schema_updates_to_run {
616        if graph_moving_aborted {
617            diagnostics.push(Diagnostic::warning(
618                "schema_apply_skipped",
619                schema_address(graph_id),
620                "skipped after an earlier graph-moving operation failed in this run",
621            ));
622            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
623            continue;
624        }
625        let Some(desired_graph) = desired.graphs.iter().find(|graph| &graph.id == graph_id) else {
626            continue;
627        };
628        let graph_uri = backend.graph_root(graph_id);
629        // Read-write open: the engine's own recovery sweep runs here, which
630        // is exactly what we want before moving its manifest.
631        let db = match Omnigraph::open(&graph_uri).await {
632            Ok(db) => db,
633            Err(err) => {
634                diagnostics.push(Diagnostic::error(
635                    "schema_apply_failed",
636                    schema_address(graph_id),
637                    format!("could not open graph at '{graph_uri}': {err}"),
638                ));
639                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
640                graph_moving_aborted = true;
641                continue;
642            }
643        };
644        // Re-read + digest-verify the desired schema source before the
645        // cluster sidecar exists. Parser/planner rejections cannot have
646        // moved graph state, so they must not leave recovery work behind.
647        let schema_source = source_paths
648            .get(schema_address(graph_id).as_str())
649            .ok_or_else(|| {
650                Diagnostic::error(
651                    "schema_apply_failed",
652                    schema_address(graph_id),
653                    "no schema source recorded for graph",
654                )
655            })
656            .and_then(|path| {
657                fs::read_to_string(Path::new(path)).map_err(|err| {
658                    Diagnostic::error(
659                        "schema_apply_failed",
660                        schema_address(graph_id),
661                        format!("could not read schema source '{path}': {err}"),
662                    )
663                })
664            })
665            .and_then(|source| {
666                if sha256_hex(source.as_bytes()) == desired_graph.schema_digest {
667                    Ok(source)
668                } else {
669                    Err(Diagnostic::error(
670                        "resource_content_changed",
671                        schema_address(graph_id),
672                        "schema source changed while apply was running; re-run `cluster apply`",
673                    ))
674                }
675            });
676        let schema_source = match schema_source {
677            Ok(source) => source,
678            Err(diagnostic) => {
679                diagnostics.push(diagnostic);
680                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
681                graph_moving_aborted = true;
682                continue;
683            }
684        };
685        if let Err(err) = db
686            .preview_schema_apply_with_options(&schema_source, SchemaApplyOptions::default())
687            .await
688        {
689            diagnostics.push(Diagnostic::error(
690                "schema_apply_failed",
691                schema_address(graph_id),
692                format!("schema apply is not supported on '{graph_uri}': {err}"),
693            ));
694            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
695            graph_moving_aborted = true;
696            continue;
697        }
698        let observed_manifest_version = match db.snapshot_of(ReadTarget::branch("main")).await {
699            Ok(snapshot) => Some(snapshot.version()),
700            Err(_) => None,
701        };
702        let recorded_schema_digest = state
703            .applied_revision
704            .resources
705            .get(&schema_address(graph_id))
706            .map(|entry| entry.digest.clone());
707        let mut sidecar = RecoverySidecar {
708            schema_version: 1,
709            operation_id: Ulid::new().to_string(),
710            started_at: now_rfc3339(),
711            actor: options.actor.clone(),
712            kind: RecoverySidecarKind::SchemaApply,
713            graph_id: graph_id.clone(),
714            graph_uri: graph_uri.clone(),
715            observed_manifest_version,
716            expected_manifest_version: None,
717            desired_schema_digest: desired_graph.schema_digest.clone(),
718            state_cas_base: expected_cas.clone(),
719            approval_id: None,
720        };
721        let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
722            Ok(path) => path,
723            Err(diagnostic) => {
724                diagnostics.push(diagnostic);
725                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
726                graph_moving_aborted = true;
727                continue;
728            }
729        };
730        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_SCHEMA_APPLY) {
731            // Simulated crash before the engine call: the sidecar stays; the
732            // sweep retires it next run (ledger still consistent with live).
733            diagnostics.push(diagnostic);
734            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
735            graph_moving_aborted = true;
736            continue;
737        }
738        // Soft drops only: allow_data_loss stays false until the approval
739        // artifacts of stage 4C exist (RFC-004 §D4).
740        match db
741            .apply_schema_as(
742                &schema_source,
743                SchemaApplyOptions::default(),
744                options.actor.as_deref(),
745            )
746            .await
747        {
748            Ok(result) => {
749                sidecar.expected_manifest_version = Some(result.manifest_version);
750                if let Err(diagnostic) = backend.write_recovery_sidecar(&sidecar).await {
751                    diagnostics.push(diagnostic);
752                }
753            }
754            Err(err) => {
755                diagnostics.push(Diagnostic::error(
756                    "schema_apply_failed",
757                    schema_address(graph_id),
758                    format!("schema apply failed on '{graph_uri}': {err}"),
759                ));
760                if live_schema_matches_recorded_digest(
761                    &graph_uri,
762                    recorded_schema_digest.as_deref(),
763                    observed_manifest_version,
764                )
765                .await
766                {
767                    // Pre-movement rejection: nothing moved, so retire the
768                    // sidecar eagerly. A delete failure leaves it safe (the
769                    // graph is quarantined until the next sweep), but surface
770                    // it so an operator isn't left debugging a silent stick.
771                    if let Err(err) = backend.try_delete_object(&sidecar_path).await {
772                        diagnostics.push(Diagnostic::warning(
773                            "recovery_sidecar_cleanup_failed",
774                            sidecar_path.clone(),
775                            format!(
776                                "could not delete the stale recovery sidecar after a pre-movement \
777                                 schema-apply rejection; graph `{graph_id}` stays quarantined until \
778                                 a state-mutating cluster command sweeps it: {err}"
779                            ),
780                        ));
781                    }
782                }
783                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::SchemaApply);
784                graph_moving_aborted = true;
785                continue;
786            }
787        }
788        // Crash point: the manifest moved, the ledger does not record it yet.
789        // A failure here acknowledges nothing; the sweep rolls forward.
790        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_SCHEMA_APPLY) {
791            diagnostics.push(diagnostic);
792            return early_return(
793                display_path(&desired.config_dir),
794                Some(desired.config_digest),
795                observations,
796                changes,
797                state.resource_statuses,
798                diagnostics,
799            );
800        }
801        completed_op_sidecars.push(sidecar_path);
802    }
803
804    if !failed_graphs.is_empty() {
805        demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
806    }
807
808    for change in &changes {
809        match change.disposition {
810            Some(ApplyDisposition::Deferred) => diagnostics.push(Diagnostic::warning(
811                "apply_unsupported_change",
812                change.resource.clone(),
813                "graph/schema changes are not applied in this stage; they are deferred to the graph-lifecycle phase",
814            )),
815            Some(ApplyDisposition::Blocked) => diagnostics.push(Diagnostic::warning(
816                "apply_dependency_blocked",
817                change.resource.clone(),
818                format!(
819                    "blocked by an unapplied or missing dependency ({})",
820                    change.reason.as_deref().unwrap_or("dependency")
821                ),
822            )),
823            _ => {}
824        }
825    }
826
827    // Payload phase: content-addressed writes before the state CAS. Any
828    // failure aborts before state moves; blobs already written are inert.
829    // Gate on payload-phase errors only — sweep errors (e.g. a kept row-5
830    // sidecar) must not abort the run, or their statuses would never persist.
831    let errors_before_payloads = count_errors(&diagnostics);
832    for change in &changes {
833        if change.disposition != Some(ApplyDisposition::Applied)
834            || change.operation == PlanOperation::Delete
835        {
836            continue;
837        }
838        let kind = resource_kind(&change.resource);
839        let digest = change
840            .after_digest
841            .as_deref()
842            .expect("create/update always carries an after digest");
843        if ClusterStore::payload_relative(&kind, digest).is_none() {
844            continue;
845        }
846        let Some(source) = source_paths.get(change.resource.as_str()) else {
847            diagnostics.push(Diagnostic::error(
848                "resource_payload_write_error",
849                change.resource.clone(),
850                "no source file recorded for resource",
851            ));
852            continue;
853        };
854        if let Err(diagnostic) =
855            write_resource_payload(&backend, &kind, Path::new(source), digest, &change.resource)
856                .await
857        {
858            diagnostics.push(diagnostic);
859        }
860    }
861    if count_errors(&diagnostics) > errors_before_payloads {
862        return early_return(
863            display_path(&desired.config_dir),
864            Some(desired.config_digest),
865            observations,
866            changes,
867            state.resource_statuses,
868            diagnostics,
869        );
870    }
871
872    // Crash point: payloads are on disk, state has not moved. A failure here
873    // must leave state.json byte-identical and acknowledge nothing; re-running
874    // apply repairs via the skip-if-exists blob reuse.
875    if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_PAYLOAD_PHASE) {
876        diagnostics.push(diagnostic);
877        return early_return(
878            display_path(&desired.config_dir),
879            Some(desired.config_digest),
880            observations,
881            changes,
882            state.resource_statuses,
883            diagnostics,
884        );
885    }
886
887    // Approved graph deletes execute LAST (RFC-004 §D5): catalog writes for
888    // surviving resources land first, then the irreversible work.
889    let graph_deletes_to_run: Vec<String> = changes
890        .iter()
891        .filter(|change| {
892            change.disposition == Some(ApplyDisposition::Applied)
893                && change.operation == PlanOperation::Delete
894                && matches!(resource_kind(&change.resource), ResourceKind::Graph(_))
895        })
896        .filter_map(|change| change.resource.strip_prefix("graph.").map(str::to_string))
897        .collect();
898    let mut executed_deletes: Vec<(String, Option<String>)> = Vec::new(); // (graph_id, approval_id)
899    let mut consumed_approval_ids: Vec<String> = Vec::new();
900    for graph_id in &graph_deletes_to_run {
901        if graph_moving_aborted {
902            diagnostics.push(Diagnostic::warning(
903                "graph_delete_skipped",
904                graph_address(graph_id),
905                "skipped after an earlier graph-moving operation failed in this run",
906            ));
907            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
908            continue;
909        }
910        let graph_addr = graph_address(graph_id);
911        // Re-locate the consumable approval (classification verified one exists).
912        let approval_id = approval_artifacts
913            .iter()
914            .map(|(_, artifact)| artifact)
915            .find(|artifact| {
916                artifact.consumed_at.is_none()
917                    && artifact.resource == graph_addr
918                    && artifact.bound_config_digest == desired.config_digest
919            })
920            .map(|artifact| artifact.approval_id.clone());
921        let graph_uri = backend.graph_root(graph_id);
922        let observed_manifest_version = match Omnigraph::open_read_only(&graph_uri).await {
923            Ok(db) => match db.snapshot_of(ReadTarget::branch("main")).await {
924                Ok(snapshot) => Some(snapshot.version()),
925                Err(_) => None,
926            },
927            Err(_) => None, // partial/unopenable roots still get deleted
928        };
929        let sidecar = RecoverySidecar {
930            schema_version: 1,
931            operation_id: Ulid::new().to_string(),
932            started_at: now_rfc3339(),
933            actor: options.actor.clone(),
934            kind: RecoverySidecarKind::GraphDelete,
935            graph_id: graph_id.clone(),
936            graph_uri: graph_uri.clone(),
937            observed_manifest_version,
938            expected_manifest_version: None, // no post-op manifest exists
939            desired_schema_digest: String::new(),
940            state_cas_base: expected_cas.clone(),
941            approval_id: approval_id.clone(),
942        };
943        let sidecar_path = match backend.write_recovery_sidecar(&sidecar).await {
944            Ok(path) => path,
945            Err(diagnostic) => {
946                diagnostics.push(diagnostic);
947                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
948                graph_moving_aborted = true;
949                continue;
950            }
951        };
952        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_GRAPH_DELETE) {
953            // Simulated crash before removal: row 8 retires the intent and
954            // the still-valid approval lets a later run retry.
955            diagnostics.push(diagnostic);
956            failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
957            graph_moving_aborted = true;
958            continue;
959        }
960        // Prefix delete through the storage layer: remove_dir_all locally,
961        // list+delete on object stores (idempotent; already-gone is fine).
962        match backend.delete_graph_root(&graph_uri).await {
963            Ok(()) => {}
964            Err(err) => {
965                diagnostics.push(Diagnostic::error(
966                    "graph_delete_failed",
967                    graph_addr.clone(),
968                    format!("could not remove graph root '{graph_uri}': {err}"),
969                ));
970                failed_graphs.insert(graph_id.clone(), FailedGraphOrigin::GraphDelete);
971                graph_moving_aborted = true;
972                continue;
973            }
974        }
975        // Crash point: the root is gone, the ledger does not record it yet.
976        // The sweep rolls forward (row 7b) and consumes the approval.
977        if let Err(diagnostic) = failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_AFTER_GRAPH_DELETE) {
978            diagnostics.push(diagnostic);
979            return early_return(
980                display_path(&desired.config_dir),
981                Some(desired.config_digest),
982                observations,
983                changes,
984                state.resource_statuses,
985                diagnostics,
986            );
987        }
988        executed_deletes.push((graph_id.clone(), approval_id.clone()));
989        if let Some(approval_id) = approval_id {
990            consumed_approval_ids.push(approval_id);
991        }
992        completed_op_sidecars.push(sidecar_path);
993    }
994    if !failed_graphs.is_empty() {
995        demote_dependents_of_failed_graphs(&mut changes, &failed_graphs, &desired.dependencies);
996    }
997
998    // State mutation. Apply owns query/policy statuses only; graph/schema
999    // statuses belong to refresh/import observation and must not be clobbered
1000    // (the sweep above is the one exception: it owns recovery statuses).
1001    let mut new_state = state.clone();
1002    for change in &changes {
1003        match change.disposition {
1004            Some(ApplyDisposition::Applied) => match change.operation {
1005                PlanOperation::Create | PlanOperation::Update => {
1006                    new_state.applied_revision.resources.insert(
1007                        change.resource.clone(),
1008                        StateResource {
1009                            digest: change
1010                                .after_digest
1011                                .clone()
1012                                .expect("create/update always carries an after digest"),
1013                            // Policies record their applied bindings so the
1014                            // ledger is serving-sufficient (RFC-005 §D3).
1015                            applies_to: desired.policy_bindings.get(&change.resource).cloned(),
1016                            embedding_provider: None,
1017                            embedding_profile: desired
1018                                .embedding_providers
1019                                .get(&change.resource)
1020                                .cloned(),
1021                        },
1022                    );
1023                    set_resource_status_applied(&mut new_state, &change.resource);
1024                }
1025                PlanOperation::Delete => {
1026                    new_state
1027                        .applied_revision
1028                        .resources
1029                        .remove(&change.resource);
1030                    new_state.resource_statuses.remove(&change.resource);
1031                }
1032            },
1033            Some(ApplyDisposition::Blocked) => {
1034                // The sweep owns recovery statuses (Drifted/Error with their
1035                // conditions); a generic Blocked must not clobber them.
1036                if change.reason.as_deref() != Some("cluster_recovery_pending") {
1037                    set_resource_status(
1038                        &mut new_state,
1039                        &change.resource,
1040                        ResourceLifecycleStatus::Blocked,
1041                        change.reason.as_deref().unwrap_or("dependency_not_applied"),
1042                        "waiting on an unapplied or missing dependency",
1043                    );
1044                }
1045            }
1046            _ => {}
1047        }
1048    }
1049    for (graph_id, approval_id) in &executed_deletes {
1050        tombstone_graph_subtree(
1051            &mut new_state,
1052            graph_id,
1053            approval_id.as_deref(),
1054            options.actor.as_deref(),
1055        );
1056        if let Some(approval_id) = approval_id {
1057            record_approval_consumed(&mut new_state, approval_id, "apply");
1058        }
1059    }
1060    recompute_state_graph_digests(&mut new_state, &desired);
1061
1062    let mut residual = diff_resources(
1063        &state_resource_digests(&new_state),
1064        &desired.resource_digests,
1065    );
1066    append_policy_binding_changes(&mut residual, Some(&new_state), &desired);
1067    append_embedding_profile_changes(&mut residual, Some(&new_state), &desired);
1068    let converged = residual.is_empty();
1069    if converged {
1070        new_state.applied_revision.config_digest = Some(desired.config_digest.clone());
1071    }
1072
1073    let after_value =
1074        serde_json::to_value(&new_state).expect("cluster state must serialize deterministically");
1075    let mut state_written = false;
1076    let mut state_write_failed = false;
1077    if after_value != before_value {
1078        new_state.state_revision = new_state.state_revision.saturating_add(1);
1079        // The failpoint error routes through state_write_failed so the
1080        // persisted-statuses revert contract below is exercised; a cfg_callback
1081        // on this point can mutate state.json to simulate a concurrent writer,
1082        // making write_state's CAS check fail organically.
1083        let write_result = match failpoints::maybe_fail(crate::failpoints::names::CLUSTER_APPLY_BEFORE_STATE_WRITE) {
1084            Ok(()) => {
1085                backend
1086                    .write_state(&new_state, expected_cas.as_deref(), &mut observations)
1087                    .await
1088            }
1089            Err(diagnostic) => Err(diagnostic),
1090        };
1091        match write_result {
1092            Ok(()) => state_written = true,
1093            Err(diagnostic) => {
1094                diagnostics.push(diagnostic);
1095                state_write_failed = true;
1096            }
1097        }
1098    }
1099    // Completed (rows 2/4) sweep sidecars are deleted only once their outcome
1100    // is durably recorded; on a failed write they stay and re-sweep next run.
1101    if !state_write_failed {
1102        for sidecar_uri in sweep
1103            .completed_sidecars
1104            .iter()
1105            .chain(completed_op_sidecars.iter())
1106        {
1107            backend.delete_object(sidecar_uri).await;
1108        }
1109        let mut all_consumed = sweep.consumed_approvals.clone();
1110        all_consumed.extend(consumed_approval_ids.iter().cloned());
1111        mark_approvals_consumed(&backend, &all_consumed).await;
1112    }
1113    // On a failed state write, report the statuses that are actually on disk
1114    // (the pre-apply snapshot), not the in-memory mutations that were never
1115    // persisted — automation reading `resource_statuses` independently of `ok`
1116    // must not see phantom status updates.
1117    let resource_statuses = if state_write_failed {
1118        state.resource_statuses
1119    } else {
1120        new_state.resource_statuses
1121    };
1122
1123    let applied_count = changes
1124        .iter()
1125        .filter(|change| change.disposition == Some(ApplyDisposition::Applied))
1126        .count();
1127    let deferred_count = changes
1128        .iter()
1129        .filter(|change| {
1130            matches!(
1131                change.disposition,
1132                Some(ApplyDisposition::Deferred) | Some(ApplyDisposition::Blocked)
1133            )
1134        })
1135        .count();
1136
1137    ApplyOutput {
1138        ok: !has_errors(&diagnostics),
1139        config_dir: display_path(&desired.config_dir),
1140        actor: options.actor.clone(),
1141        desired_revision: DesiredRevision {
1142            config_digest: Some(desired.config_digest),
1143        },
1144        state_observations: observations,
1145        changes,
1146        applied_count,
1147        deferred_count,
1148        converged,
1149        state_written,
1150        resource_statuses,
1151        diagnostics,
1152    }
1153}
1154
1155/// Record a digest-bound human approval for a gated (irreversible) change —
1156/// today: graph deletes. The artifact binds to the exact desired config
1157/// digest and the change's before/after digests, so config or state drift
1158/// invalidates it automatically (a stale approval can never authorize a
1159/// different change).
1160pub async fn approve_config_dir(
1161    config_dir: impl AsRef<Path>,
1162    resource: &str,
1163    approved_by: &str,
1164) -> ApproveOutput {
1165    let outcome = load_desired(config_dir.as_ref());
1166    let mut diagnostics = outcome.diagnostics;
1167    let storage_root = outcome
1168        .desired
1169        .as_ref()
1170        .and_then(|desired| desired.storage_root.clone());
1171    let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
1172        Ok(backend) => backend,
1173        Err(diagnostic) => {
1174            diagnostics.push(diagnostic);
1175            ClusterStore::for_config_dir(&outcome.config_dir)
1176        }
1177    };
1178    let mut observations = backend.observations();
1179
1180    let fail = |config_dir: String, diagnostics: Vec<Diagnostic>| ApproveOutput {
1181        ok: false,
1182        config_dir,
1183        approval_id: None,
1184        resource: None,
1185        operation: None,
1186        approved_by: None,
1187        diagnostics,
1188    };
1189
1190    let Some(desired) = outcome.desired else {
1191        return fail(display_path(&outcome.config_dir), diagnostics);
1192    };
1193    if has_errors(&diagnostics) {
1194        return fail(display_path(&desired.config_dir), diagnostics);
1195    }
1196
1197    let _lock_guard = if desired.state_lock {
1198        match backend.acquire_lock("approve", &mut observations).await {
1199            Ok(guard) => Some(guard),
1200            Err(diagnostic) => {
1201                diagnostics.push(diagnostic);
1202                return fail(display_path(&desired.config_dir), diagnostics);
1203            }
1204        }
1205    } else {
1206        diagnostics.push(Diagnostic::warning(
1207            "state_lock_disabled",
1208            "state.lock",
1209            "state.lock is false; approve ran without acquiring the cluster state lock",
1210        ));
1211        None
1212    };
1213
1214    let state = match backend.read_state(&mut observations).await {
1215        Ok(snapshot) => match snapshot.state {
1216            Some(state) => state,
1217            None => {
1218                diagnostics.push(Diagnostic::error(
1219                    "state_missing",
1220                    CLUSTER_STATE_FILE,
1221                    "approve requires an existing state.json; run `cluster import` first",
1222                ));
1223                return fail(display_path(&desired.config_dir), diagnostics);
1224            }
1225        },
1226        Err(diagnostic) => {
1227            diagnostics.push(diagnostic);
1228            return fail(display_path(&desired.config_dir), diagnostics);
1229        }
1230    };
1231
1232    let prior_resources = state_resource_digests(&state);
1233    let changes = diff_resources(&prior_resources, &desired.resource_digests);
1234    let gates = compute_approvals(&changes, &BTreeSet::new());
1235    let Some(change) = changes.iter().find(|change| {
1236        change.resource == resource && gates.iter().any(|gate| gate.resource == resource)
1237    }) else {
1238        diagnostics.push(Diagnostic::error(
1239            "approval_not_required",
1240            resource,
1241            "no pending change for this resource requires approval (check `cluster plan`)",
1242        ));
1243        return fail(display_path(&desired.config_dir), diagnostics);
1244    };
1245
1246    let artifact = ApprovalArtifact {
1247        schema_version: 1,
1248        approval_id: Ulid::new().to_string(),
1249        resource: change.resource.clone(),
1250        operation: match change.operation {
1251            PlanOperation::Create => "create",
1252            PlanOperation::Update => "update",
1253            PlanOperation::Delete => "delete",
1254        }
1255        .to_string(),
1256        reason: gates
1257            .iter()
1258            .find(|gate| gate.resource == resource)
1259            .map(|gate| gate.reason.clone())
1260            .unwrap_or_default(),
1261        bound_config_digest: desired.config_digest.clone(),
1262        bound_before_digest: change.before_digest.clone(),
1263        bound_after_digest: change.after_digest.clone(),
1264        approved_by: approved_by.to_string(),
1265        created_at: now_rfc3339(),
1266        consumed_at: None,
1267        consumed_by_operation: None,
1268    };
1269    if let Err(diagnostic) = backend.write_approval_artifact(&artifact).await {
1270        diagnostics.push(diagnostic);
1271        return fail(display_path(&desired.config_dir), diagnostics);
1272    }
1273
1274    ApproveOutput {
1275        ok: !has_errors(&diagnostics),
1276        config_dir: display_path(&desired.config_dir),
1277        approval_id: Some(artifact.approval_id),
1278        resource: Some(artifact.resource),
1279        operation: Some(change.operation.clone()),
1280        approved_by: Some(artifact.approved_by),
1281        diagnostics,
1282    }
1283}
1284
1285pub async fn status_config_dir(config_dir: impl AsRef<Path>) -> StatusOutput {
1286    let parsed = parse_cluster_config(config_dir.as_ref());
1287    let mut diagnostics = parsed.diagnostics;
1288    let storage_root = parsed.raw.as_ref().and_then(|raw| {
1289        raw.storage
1290            .as_deref()
1291            .map(str::trim)
1292            .filter(|root| !root.is_empty())
1293            .map(|root| root.trim_end_matches('/').to_string())
1294    });
1295    let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
1296        Ok(backend) => backend,
1297        Err(diagnostic) => {
1298            diagnostics.push(diagnostic);
1299            ClusterStore::for_config_dir(&parsed.config_dir)
1300        }
1301    };
1302    let mut observations = backend.observations();
1303    backend
1304        .observe_lock(&mut observations, &mut diagnostics)
1305        .await;
1306    warn_pending_recovery_sidecars(&backend, &mut diagnostics).await;
1307
1308    let mut resource_digests = BTreeMap::new();
1309    let mut resource_statuses = BTreeMap::new();
1310    let mut state_observation_records = BTreeMap::new();
1311
1312    if let Some(raw) = parsed.raw.as_ref() {
1313        let _settings = validate_cluster_header(raw, &mut diagnostics);
1314        if !has_errors(&diagnostics) {
1315            match backend.read_state(&mut observations).await {
1316                Ok(snapshot) => {
1317                    if let Some(state) = snapshot.state {
1318                        // Read-only point-in-time catalog check: report the
1319                        // findings as diagnostics; persisting Drifted statuses
1320                        // is refresh's job. Status never writes state.
1321                        for (address, finding) in verify_catalog_payloads(&backend, &state).await {
1322                            diagnostics.push(payload_finding_diagnostic(&address, &finding));
1323                        }
1324                        resource_digests = state_resource_digests(&state);
1325                        resource_statuses = state.resource_statuses;
1326                        state_observation_records = state.observations;
1327                    } else {
1328                        diagnostics.push(Diagnostic::warning(
1329                            "state_missing",
1330                            CLUSTER_STATE_FILE,
1331                            "state.json is missing; no applied cluster revision has been recorded",
1332                        ));
1333                    }
1334                }
1335                Err(diagnostic) => diagnostics.push(diagnostic),
1336            }
1337        }
1338    }
1339
1340    StatusOutput {
1341        ok: !has_errors(&diagnostics),
1342        config_dir: display_path(&parsed.config_dir),
1343        state_observations: observations,
1344        resource_digests,
1345        resource_statuses,
1346        observations: state_observation_records,
1347        diagnostics,
1348    }
1349}
1350
1351pub async fn force_unlock_config_dir(
1352    config_dir: impl AsRef<Path>,
1353    lock_id: impl AsRef<str>,
1354) -> ForceUnlockOutput {
1355    let parsed = parse_cluster_config(config_dir.as_ref());
1356    let mut diagnostics = parsed.diagnostics;
1357    let storage_root = parsed.raw.as_ref().and_then(|raw| {
1358        raw.storage
1359            .as_deref()
1360            .map(str::trim)
1361            .filter(|root| !root.is_empty())
1362            .map(|root| root.trim_end_matches('/').to_string())
1363    });
1364    let backend = match store_for(&parsed.config_dir, storage_root.as_deref()) {
1365        Ok(backend) => backend,
1366        Err(diagnostic) => {
1367            diagnostics.push(diagnostic);
1368            ClusterStore::for_config_dir(&parsed.config_dir)
1369        }
1370    };
1371    let mut observations = backend.observations();
1372    let mut lock_removed = false;
1373
1374    if let Some(raw) = parsed.raw.as_ref() {
1375        let _settings = validate_cluster_header(raw, &mut diagnostics);
1376        if !has_errors(&diagnostics) {
1377            match backend
1378                .force_unlock(lock_id.as_ref(), &mut observations)
1379                .await
1380            {
1381                Ok(()) => lock_removed = true,
1382                Err(diagnostic) => diagnostics.push(diagnostic),
1383            }
1384        }
1385    }
1386
1387    ForceUnlockOutput {
1388        ok: !has_errors(&diagnostics),
1389        config_dir: display_path(&parsed.config_dir),
1390        state_observations: observations,
1391        lock_removed,
1392        diagnostics,
1393    }
1394}
1395
1396pub async fn refresh_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput {
1397    sync_config_dir(config_dir.as_ref(), StateSyncOperation::Refresh).await
1398}
1399
1400pub async fn import_config_dir(config_dir: impl AsRef<Path>) -> StateSyncOutput {
1401    sync_config_dir(config_dir.as_ref(), StateSyncOperation::Import).await
1402}
1403
1404async fn sync_config_dir(config_dir: &Path, operation: StateSyncOperation) -> StateSyncOutput {
1405    let outcome = load_desired(config_dir);
1406    let mut diagnostics = outcome.diagnostics;
1407    let storage_root = outcome
1408        .desired
1409        .as_ref()
1410        .and_then(|desired| desired.storage_root.clone());
1411    let backend = match store_for(&outcome.config_dir, storage_root.as_deref()) {
1412        Ok(backend) => backend,
1413        Err(diagnostic) => {
1414            diagnostics.push(diagnostic);
1415            ClusterStore::for_config_dir(&outcome.config_dir)
1416        }
1417    };
1418    let mut observations = backend.observations();
1419
1420    let Some(desired) = outcome.desired else {
1421        return StateSyncOutput {
1422            ok: false,
1423            operation,
1424            config_dir: display_path(&outcome.config_dir),
1425            state_observations: observations,
1426            resource_digests: BTreeMap::new(),
1427            resource_statuses: BTreeMap::new(),
1428            observations: BTreeMap::new(),
1429            diagnostics,
1430        };
1431    };
1432
1433    if has_errors(&diagnostics) {
1434        return StateSyncOutput {
1435            ok: false,
1436            operation,
1437            config_dir: display_path(&desired.config_dir),
1438            state_observations: observations,
1439            resource_digests: desired.resource_digests,
1440            resource_statuses: BTreeMap::new(),
1441            observations: BTreeMap::new(),
1442            diagnostics,
1443        };
1444    }
1445
1446    let operation_label = state_sync_operation_label(operation);
1447    let _lock_guard = if desired.state_lock {
1448        match backend
1449            .acquire_lock(operation_label, &mut observations)
1450            .await
1451        {
1452            Ok(guard) => Some(guard),
1453            Err(diagnostic) => {
1454                diagnostics.push(diagnostic);
1455                None
1456            }
1457        }
1458    } else {
1459        diagnostics.push(Diagnostic::warning(
1460            "state_lock_disabled",
1461            "state.lock",
1462            format!(
1463                "state.lock is false; {operation_label} wrote state without acquiring the cluster state lock"
1464            ),
1465        ));
1466        None
1467    };
1468
1469    if has_errors(&diagnostics) {
1470        return StateSyncOutput {
1471            ok: false,
1472            operation,
1473            config_dir: display_path(&desired.config_dir),
1474            state_observations: observations,
1475            resource_digests: desired.resource_digests,
1476            resource_statuses: BTreeMap::new(),
1477            observations: BTreeMap::new(),
1478            diagnostics,
1479        };
1480    }
1481
1482    let snapshot = match backend.read_state(&mut observations).await {
1483        Ok(snapshot) => snapshot,
1484        Err(diagnostic) => {
1485            diagnostics.push(diagnostic);
1486            return StateSyncOutput {
1487                ok: false,
1488                operation,
1489                config_dir: display_path(&desired.config_dir),
1490                state_observations: observations,
1491                resource_digests: desired.resource_digests,
1492                resource_statuses: BTreeMap::new(),
1493                observations: BTreeMap::new(),
1494                diagnostics,
1495            };
1496        }
1497    };
1498
1499    let expected_cas = snapshot.state_cas;
1500    let mut state = match (operation, snapshot.state) {
1501        (StateSyncOperation::Refresh, Some(state)) => state,
1502        (StateSyncOperation::Refresh, None) => {
1503            diagnostics.push(Diagnostic::error(
1504                "state_missing",
1505                CLUSTER_STATE_FILE,
1506                "refresh requires an existing state.json; run `cluster import` to bootstrap state",
1507            ));
1508            return StateSyncOutput {
1509                ok: false,
1510                operation,
1511                config_dir: display_path(&desired.config_dir),
1512                state_observations: observations,
1513                resource_digests: BTreeMap::new(),
1514                resource_statuses: BTreeMap::new(),
1515                observations: BTreeMap::new(),
1516                diagnostics,
1517            };
1518        }
1519        (StateSyncOperation::Import, Some(state)) => {
1520            diagnostics.push(Diagnostic::error(
1521                "state_already_exists",
1522                CLUSTER_STATE_FILE,
1523                "import creates initial state only when state.json is missing; use `cluster refresh` for an existing state ledger",
1524            ));
1525            return StateSyncOutput {
1526                ok: false,
1527                operation,
1528                config_dir: display_path(&desired.config_dir),
1529                state_observations: observations,
1530                resource_digests: state_resource_digests(&state),
1531                resource_statuses: state.resource_statuses,
1532                observations: state.observations,
1533                diagnostics,
1534            };
1535        }
1536        (StateSyncOperation::Import, None) => initial_import_state(&desired),
1537    };
1538
1539    // Recovery sweep first (RFC-004 §D3): classify any interrupted graph
1540    // operation before observation/verification so a rolled-forward outcome
1541    // is what those passes see.
1542    let sweep = sweep_recovery_sidecars(&backend, &mut state, &mut diagnostics).await;
1543
1544    // Catalog payload verification must run BEFORE graph observation: removing
1545    // a drifted query digest first means the live-graph composite recompute
1546    // below already excludes it, so the persisted graph.<id> composite stays
1547    // consistent and the next plan shows exactly the create + derived update.
1548    for (address, finding) in verify_catalog_payloads(&backend, &state).await {
1549        diagnostics.push(payload_finding_diagnostic(&address, &finding));
1550        match finding {
1551            PayloadFinding::Missing => {
1552                state.applied_revision.resources.remove(&address);
1553                set_resource_status(
1554                    &mut state,
1555                    &address,
1556                    ResourceLifecycleStatus::Drifted,
1557                    "payload_missing",
1558                    "catalog payload blob is missing; re-run `cluster apply` to republish",
1559                );
1560            }
1561            PayloadFinding::Mismatch { .. } => {
1562                state.applied_revision.resources.remove(&address);
1563                set_resource_status(
1564                    &mut state,
1565                    &address,
1566                    ResourceLifecycleStatus::Drifted,
1567                    "payload_mismatch",
1568                    "catalog payload blob does not match the recorded digest; re-run `cluster apply` to republish",
1569                );
1570            }
1571            // Transient IO must not trigger a spurious republish: keep the
1572            // digest, surface the error, let a later clean refresh converge.
1573            PayloadFinding::ReadError(error) => {
1574                set_resource_status(
1575                    &mut state,
1576                    &address,
1577                    ResourceLifecycleStatus::Error,
1578                    "payload_read_error",
1579                    &error,
1580                );
1581            }
1582        }
1583    }
1584
1585    let graph_error_count = observe_declared_graphs(&desired, &backend, &mut state).await;
1586    if graph_error_count > 0 {
1587        diagnostics.push(Diagnostic::error(
1588            "graph_observation_error",
1589            CLUSTER_GRAPHS_DIR,
1590            format!("{graph_error_count} graph observation(s) failed"),
1591        ));
1592    }
1593
1594    if operation == StateSyncOperation::Import && has_errors(&diagnostics) {
1595        return StateSyncOutput {
1596            ok: false,
1597            operation,
1598            config_dir: display_path(&desired.config_dir),
1599            state_observations: observations,
1600            resource_digests: state_resource_digests(&state),
1601            resource_statuses: state.resource_statuses,
1602            observations: state.observations,
1603            diagnostics,
1604        };
1605    }
1606
1607    if operation == StateSyncOperation::Import {
1608        state.state_revision = 1;
1609    } else {
1610        state.state_revision = state.state_revision.saturating_add(1);
1611    }
1612
1613    match backend
1614        .write_state(&state, expected_cas.as_deref(), &mut observations)
1615        .await
1616    {
1617        Ok(()) => {
1618            // Completed sweep sidecars are deleted only after their outcome
1619            // is durably recorded; on failure they stay and re-sweep.
1620            for sidecar_uri in &sweep.completed_sidecars {
1621                backend.delete_object(sidecar_uri).await;
1622            }
1623            mark_approvals_consumed(&backend, &sweep.consumed_approvals).await;
1624        }
1625        Err(diagnostic) => diagnostics.push(diagnostic),
1626    }
1627
1628    let resource_digests = state_resource_digests(&state);
1629    let ok = !has_errors(&diagnostics);
1630
1631    StateSyncOutput {
1632        ok,
1633        operation,
1634        config_dir: display_path(&desired.config_dir),
1635        state_observations: observations,
1636        resource_digests,
1637        resource_statuses: state.resource_statuses,
1638        observations: state.observations,
1639        diagnostics,
1640    }
1641}
1642
1643#[derive(Debug, PartialEq, Eq)]
1644enum PayloadFinding {
1645    Missing,
1646    Mismatch { actual_digest: String },
1647    ReadError(String),
1648}
1649
1650/// Verify every catalog-backed resource digest in state against its
1651/// content-addressed blob under `__cluster/resources/`. Graph, schema, and
1652/// unknown addresses have no payloads and are skipped. Read-only; findings
1653/// are deterministic (BTreeMap order). Payloads are small (queries, policy
1654/// bundles), so a full digest re-hash is cheap.
1655async fn verify_catalog_payloads(
1656    backend: &ClusterStore,
1657    state: &ClusterState,
1658) -> Vec<(String, PayloadFinding)> {
1659    let mut findings = Vec::new();
1660    for (address, resource) in &state.applied_revision.resources {
1661        let kind = resource_kind(address);
1662        if ClusterStore::payload_relative(&kind, &resource.digest).is_none() {
1663            continue;
1664        }
1665        match backend.read_payload(&kind, &resource.digest).await {
1666            Ok(Some(text)) => {
1667                let actual_digest = sha256_hex(text.as_bytes());
1668                if actual_digest != resource.digest {
1669                    findings.push((address.clone(), PayloadFinding::Mismatch { actual_digest }));
1670                }
1671            }
1672            Ok(None) => findings.push((address.clone(), PayloadFinding::Missing)),
1673            Err(err) => {
1674                findings.push((address.clone(), PayloadFinding::ReadError(err)));
1675            }
1676        }
1677    }
1678    findings
1679}
1680
1681fn payload_finding_diagnostic(address: &str, finding: &PayloadFinding) -> Diagnostic {
1682    match finding {
1683        PayloadFinding::Missing => Diagnostic::warning(
1684            "catalog_payload_missing",
1685            address,
1686            "catalog payload blob is missing; re-run `cluster apply` to republish",
1687        ),
1688        PayloadFinding::Mismatch { actual_digest } => Diagnostic::warning(
1689            "catalog_payload_mismatch",
1690            address,
1691            format!(
1692                "catalog payload blob does not match the recorded digest (actual sha256:{actual_digest}); re-run `cluster apply` to republish"
1693            ),
1694        ),
1695        // An unverifiable blob must not report healthy.
1696        PayloadFinding::ReadError(error) => {
1697            Diagnostic::error("catalog_payload_read_error", address, error.clone())
1698        }
1699    }
1700}
1701
1702/// Write one content-addressed payload blob. Idempotent: an existing
1703/// digest-named file is trusted as-is. The digest re-check is the apply-side
1704/// TOCTOU detector — the source file changing between `load_desired` and the
1705/// payload write must fail loudly, never publish mismatched content.
1706async fn write_resource_payload(
1707    backend: &ClusterStore,
1708    kind: &ResourceKind,
1709    source: &Path,
1710    expected_digest: &str,
1711    resource: &str,
1712) -> Result<(), Diagnostic> {
1713    if backend.payload_exists(kind, expected_digest).await {
1714        // Content-addressed: an existing digest-named object is identical.
1715        return Ok(());
1716    }
1717    let bytes = fs::read(source).map_err(|err| {
1718        Diagnostic::error(
1719            "resource_payload_write_error",
1720            resource,
1721            format!(
1722                "could not read resource source '{}': {err}",
1723                source.display()
1724            ),
1725        )
1726    })?;
1727    if sha256_hex(&bytes) != expected_digest {
1728        // The apply-side TOCTOU detector: the source changing between
1729        // load_desired and this write must fail loudly, never publish
1730        // mismatched content.
1731        return Err(Diagnostic::error(
1732            "resource_content_changed",
1733            resource,
1734            format!(
1735                "resource source '{}' changed while apply was running; re-run `cluster apply`",
1736                source.display()
1737            ),
1738        ));
1739    }
1740    let content = String::from_utf8(bytes).map_err(|err| {
1741        Diagnostic::error(
1742            "resource_payload_write_error",
1743            resource,
1744            format!("resource source is not valid UTF-8: {err}"),
1745        )
1746    })?;
1747    backend
1748        .write_payload(kind, expected_digest, &content)
1749        .await
1750        .map_err(|err| {
1751            Diagnostic::error(
1752                "resource_payload_write_error",
1753                resource,
1754                format!("could not write payload: {err}"),
1755            )
1756        })
1757}
1758
1759/// Recompute the composite `graph.<id>` digests for state-resident graphs from
1760/// state's own schema/query components. Without this, an applied query change
1761/// would leave the prior composite digest in state and `graph.<id>` would show
1762/// a phantom update in every later plan — apply could never converge.
1763fn recompute_state_graph_digests(state: &mut ClusterState, desired: &DesiredCluster) {
1764    for graph in &desired.graphs {
1765        let graph_address = graph_address(&graph.id);
1766        if !state
1767            .applied_revision
1768            .resources
1769            .contains_key(&graph_address)
1770        {
1771            continue;
1772        }
1773        let schema_digest = state
1774            .applied_revision
1775            .resources
1776            .get(&schema_address(&graph.id))
1777            .map(|resource| resource.digest.clone());
1778        let query_digests = state_query_digests_for_graph(state, &graph.id);
1779        let embedding_provider = graph.embedding_provider.as_deref();
1780        let embedding_provider_digest = embedding_provider
1781            .and_then(|address| state.applied_revision.resources.get(address))
1782            .map(|resource| resource.digest.clone());
1783        let digest = graph_digest(
1784            &graph.id,
1785            schema_digest.as_ref(),
1786            Some(&query_digests),
1787            embedding_provider,
1788            embedding_provider_digest.as_ref(),
1789        );
1790        state.applied_revision.resources.insert(
1791            graph_address,
1792            StateResource {
1793                digest,
1794                applies_to: None,
1795                embedding_provider: graph.embedding_provider.clone(),
1796                embedding_profile: None,
1797            },
1798        );
1799    }
1800}
1801
1802fn duplicate_key_diagnostics(text: &str) -> Vec<Diagnostic> {
1803    #[derive(Debug)]
1804    struct Frame {
1805        indent: isize,
1806        path: String,
1807        keys: BTreeSet<String>,
1808    }
1809
1810    let mut diagnostics = Vec::new();
1811    let mut stack = vec![Frame {
1812        indent: -1,
1813        path: String::new(),
1814        keys: BTreeSet::new(),
1815    }];
1816
1817    for (line_idx, line) in text.lines().enumerate() {
1818        let line_without_comment = strip_comment(line);
1819        if line_without_comment.trim().is_empty() {
1820            continue;
1821        }
1822        let indent = line_without_comment
1823            .chars()
1824            .take_while(|ch| *ch == ' ')
1825            .count() as isize;
1826        let trimmed = line_without_comment.trim_start();
1827        if trimmed.starts_with('-') {
1828            continue;
1829        }
1830        let Some((raw_key, raw_value)) = trimmed.split_once(':') else {
1831            continue;
1832        };
1833        let key = raw_key.trim();
1834        if key.is_empty() || key.starts_with('{') || key.starts_with('[') {
1835            continue;
1836        }
1837
1838        while stack.last().is_some_and(|frame| indent <= frame.indent) {
1839            stack.pop();
1840        }
1841        let parent = stack.last_mut().expect("root frame is always present");
1842        let full_path = if parent.path.is_empty() {
1843            key.to_string()
1844        } else {
1845            format!("{}.{}", parent.path, key)
1846        };
1847        if !parent.keys.insert(key.to_string()) {
1848            diagnostics.push(Diagnostic::error(
1849                "duplicate_yaml_key",
1850                full_path.clone(),
1851                format!("duplicate YAML key `{key}` on line {}", line_idx + 1),
1852            ));
1853        }
1854        if raw_value.trim().is_empty() {
1855            stack.push(Frame {
1856                indent,
1857                path: full_path,
1858                keys: BTreeSet::new(),
1859            });
1860        }
1861    }
1862
1863    diagnostics
1864}
1865
1866fn strip_comment(line: &str) -> String {
1867    let mut in_single_quote = false;
1868    let mut in_double_quote = false;
1869    let mut escaped = false;
1870
1871    for (idx, ch) in line.char_indices() {
1872        if escaped {
1873            escaped = false;
1874            continue;
1875        }
1876        match ch {
1877            '\\' if in_double_quote => escaped = true,
1878            '\'' if !in_double_quote => in_single_quote = !in_single_quote,
1879            '"' if !in_single_quote => in_double_quote = !in_double_quote,
1880            '#' if !in_single_quote && !in_double_quote => return line[..idx].to_string(),
1881            _ => {}
1882        }
1883    }
1884
1885    line.to_string()
1886}
1887
1888fn state_query_digests_for_graph(state: &ClusterState, graph_id: &str) -> BTreeMap<String, String> {
1889    let prefix = format!("query.{graph_id}.");
1890    state
1891        .applied_revision
1892        .resources
1893        .iter()
1894        .filter_map(|(address, resource)| {
1895            address
1896                .strip_prefix(&prefix)
1897                .map(|name| (name.to_string(), resource.digest.clone()))
1898        })
1899        .collect()
1900}
1901
1902fn state_graph_embedding_provider(state: &ClusterState, graph_id: &str) -> Option<String> {
1903    state
1904        .applied_revision
1905        .resources
1906        .get(&graph_address(graph_id))
1907        .and_then(|resource| resource.embedding_provider.clone())
1908}
1909
1910fn state_embedding_provider_digest(
1911    state: &ClusterState,
1912    embedding_provider: Option<&str>,
1913) -> Option<String> {
1914    embedding_provider
1915        .and_then(|address| state.applied_revision.resources.get(address))
1916        .map(|resource| resource.digest.clone())
1917}
1918
1919fn set_resource_status_applied(state: &mut ClusterState, address: &str) {
1920    state.resource_statuses.insert(
1921        address.to_string(),
1922        ResourceStatusRecord {
1923            status: ResourceLifecycleStatus::Applied,
1924            conditions: Vec::new(),
1925            message: None,
1926        },
1927    );
1928}
1929
1930fn set_resource_status(
1931    state: &mut ClusterState,
1932    address: &str,
1933    status: ResourceLifecycleStatus,
1934    condition: &str,
1935    message: &str,
1936) {
1937    state.resource_statuses.insert(
1938        address.to_string(),
1939        ResourceStatusRecord {
1940            status,
1941            conditions: vec![condition.to_string()],
1942            message: Some(message.to_string()),
1943        },
1944    );
1945}
1946
1947fn graph_digest(
1948    graph_id: &str,
1949    schema_digest: Option<&String>,
1950    query_digests: Option<&BTreeMap<String, String>>,
1951    embedding_provider: Option<&str>,
1952    embedding_provider_digest: Option<&String>,
1953) -> String {
1954    let mut input = format!(
1955        "graph\0{graph_id}\0schema\0{}\0",
1956        schema_digest.map_or("", String::as_str)
1957    );
1958    if let Some(query_digests) = query_digests {
1959        for (name, digest) in query_digests {
1960            input.push_str("query\0");
1961            input.push_str(name);
1962            input.push('\0');
1963            input.push_str(digest);
1964            input.push('\0');
1965        }
1966    }
1967    if let Some(provider) = embedding_provider {
1968        input.push_str("embedding_provider\0");
1969        input.push_str(provider);
1970        input.push('\0');
1971        input.push_str(embedding_provider_digest.map_or("", String::as_str));
1972        input.push('\0');
1973    }
1974    sha256_hex(input.as_bytes())
1975}
1976
1977fn embedding_provider_digest(profile: &EmbeddingProviderConfig) -> String {
1978    let mut input = String::from("embedding-provider\0");
1979    let config_semantics =
1980        serde_json::to_string(profile).expect("embedding provider config must serialize");
1981    input.push_str(&config_semantics);
1982    sha256_hex(input.as_bytes())
1983}
1984
1985async fn live_schema_matches_recorded_digest(
1986    graph_uri: &str,
1987    recorded_schema_digest: Option<&str>,
1988    observed_manifest_version: Option<u64>,
1989) -> bool {
1990    let Some(recorded_schema_digest) = recorded_schema_digest else {
1991        return false;
1992    };
1993    let Some(observed_manifest_version) = observed_manifest_version else {
1994        return false;
1995    };
1996    let Ok(db) = Omnigraph::open_read_only(graph_uri).await else {
1997        return false;
1998    };
1999    let Ok(snapshot) = db.snapshot_of(ReadTarget::branch("main")).await else {
2000        return false;
2001    };
2002    if snapshot.version() != observed_manifest_version {
2003        return false;
2004    }
2005    sha256_hex(db.schema_source().as_bytes()) == recorded_schema_digest
2006}
2007
2008fn desired_config_digest(
2009    raw: &RawClusterConfig,
2010    resource_digests: &BTreeMap<String, String>,
2011) -> String {
2012    let mut input = String::from("cluster-config\0");
2013    // Hash parsed semantics, not raw YAML bytes, so comments and formatting do
2014    // not create a new desired revision and the digest cannot drift from parse.
2015    let config_semantics =
2016        serde_json::to_string(raw).expect("raw cluster config must serialize deterministically");
2017    input.push_str(&config_semantics);
2018    input.push('\0');
2019    for (address, digest) in resource_digests {
2020        input.push_str(address);
2021        input.push('\0');
2022        input.push_str(digest);
2023        input.push('\0');
2024    }
2025    sha256_hex(input.as_bytes())
2026}
2027
2028fn sha256_hex(bytes: &[u8]) -> String {
2029    let digest = Sha256::digest(bytes);
2030    const HEX: &[u8; 16] = b"0123456789abcdef";
2031    let mut out = String::with_capacity(digest.len() * 2);
2032    for byte in digest {
2033        out.push(HEX[(byte >> 4) as usize] as char);
2034        out.push(HEX[(byte & 0x0f) as usize] as char);
2035    }
2036    out
2037}
2038
2039fn now_rfc3339() -> String {
2040    OffsetDateTime::now_utc()
2041        .format(&Rfc3339)
2042        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2043}
2044
2045fn lock_age_seconds(created_at: &str) -> Option<u64> {
2046    let created_at = OffsetDateTime::parse(created_at, &Rfc3339).ok()?;
2047    Some(
2048        (OffsetDateTime::now_utc() - created_at)
2049            .whole_seconds()
2050            .max(0) as u64,
2051    )
2052}
2053
2054fn state_sync_operation_label(operation: StateSyncOperation) -> &'static str {
2055    match operation {
2056        StateSyncOperation::Refresh => "refresh",
2057        StateSyncOperation::Import => "import",
2058    }
2059}
2060
2061fn has_errors(diagnostics: &[Diagnostic]) -> bool {
2062    diagnostics
2063        .iter()
2064        .any(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
2065}
2066
2067fn count_errors(diagnostics: &[Diagnostic]) -> usize {
2068    diagnostics
2069        .iter()
2070        .filter(|diagnostic| diagnostic.severity == DiagnosticSeverity::Error)
2071        .count()
2072}
2073
2074fn display_path(path: &Path) -> String {
2075    path.display().to_string()
2076}
2077
2078#[cfg(test)]
2079#[path = "tests.rs"]
2080mod tests;