Skip to main content

omnigraph_cluster/
serve.rs

1//! Phase-5 serving snapshot: the read-only loader a `--cluster` server
2//! boots from (moved verbatim from lib.rs in the modularization).
3
4use super::*;
5
6/// One graph in a serving snapshot: its id and on-disk root.
7#[derive(Debug, Clone)]
8pub struct ServingGraph {
9    pub graph_id: String,
10    pub root: PathBuf,
11    pub embedding: Option<EmbeddingProviderConfig>,
12}
13
14/// One stored query: its graph binding, registry name, and verified source.
15#[derive(Debug, Clone)]
16pub struct ServingQuery {
17    pub graph_id: String,
18    pub name: String,
19    pub source: String,
20}
21
22/// One policy bundle: its verified catalog blob path and applied bindings
23/// (normalized typed refs: `cluster` | `graph.<id>`).
24#[derive(Debug, Clone)]
25pub struct ServingPolicy {
26    pub name: String,
27    /// The policy bundle CONTENT, digest-verified against the applied
28    /// revision at read time. Content, not a path: the catalog may live on
29    /// object storage, and the server must not re-read mutable state.
30    pub source: String,
31    pub applies_to: Vec<String>,
32}
33
34/// Everything a server needs to boot from the cluster catalog (RFC-005 §D2).
35#[derive(Debug, Clone)]
36pub struct ServingSnapshot {
37    pub graphs: Vec<ServingGraph>,
38    pub queries: Vec<ServingQuery>,
39    pub policies: Vec<ServingPolicy>,
40    pub diagnostics: Vec<Diagnostic>,
41}
42
43/// Read the applied revision as a serving snapshot — the read-only loader for
44/// the Phase-5 server boot. Cluster-global readiness failures are still
45/// all-or-nothing, but graph-attributed pending recovery sidecars quarantine
46/// only that graph so healthy graphs can continue serving. This loader never
47/// runs a recovery sweep.
48/// Takes no lock: the state file is replaced atomically, so this reads a
49/// consistent point-in-time ledger.
50pub async fn read_serving_snapshot(
51    config_dir: impl AsRef<Path>,
52) -> Result<ServingSnapshot, Vec<Diagnostic>> {
53    let config_dir = config_dir.as_ref().to_path_buf();
54    // The declared storage: root decides where the ledger/catalog/graphs
55    // live; config parse errors surface through the normal validation path.
56    let parsed = parse_cluster_config(&config_dir);
57    let storage_root = parsed.raw.as_ref().and_then(|raw| {
58        raw.storage
59            .as_deref()
60            .map(str::trim)
61            .filter(|root| !root.is_empty())
62            .map(|root| root.trim_end_matches('/').to_string())
63    });
64    let backend = match storage_root.as_deref() {
65        Some(root) => match ClusterStore::for_storage_root(root) {
66            Ok(backend) => backend,
67            Err(diagnostic) => return Err(vec![diagnostic]),
68        },
69        None => ClusterStore::for_config_dir(&config_dir),
70    };
71    read_snapshot_with_store(backend).await
72}
73
74/// Read the applied revision directly from a storage root URI — config-free
75/// serving: a `--cluster s3://bucket/prefix` server needs no local files at
76/// all, only the bucket and credentials. The ledger and catalog ARE the
77/// deployment artifact.
78pub async fn read_serving_snapshot_from_storage(
79    storage_root: &str,
80) -> Result<ServingSnapshot, Vec<Diagnostic>> {
81    let backend =
82        ClusterStore::for_storage_root(storage_root).map_err(|diagnostic| vec![diagnostic])?;
83    read_snapshot_with_store(backend).await
84}
85
86/// Cluster root for a graph **storage URI** of the cluster layout
87/// (`<root>/graphs/<id>.omni`), if `<root>` is actually a cluster (holds
88/// `__cluster/state.json`); otherwise `None`. Used by the CLI to refuse
89/// `init` into a cluster-managed location — graphs there are created by
90/// `cluster apply`, not `init`.
91///
92/// Cheap by construction: a URI that does not match the `<root>/graphs/<id>.omni`
93/// shape returns `None` without any I/O, so ordinary `init` targets
94/// (`./kb.omni`, `s3://bucket/kb.omni`) never probe storage. Works for
95/// `file://` and `s3://` via the storage adapter.
96pub async fn cluster_root_for_graph_uri(graph_uri: &str) -> Option<String> {
97    let root = cluster_root_of_graph_layout(graph_uri)?;
98    let store = ClusterStore::for_storage_root(&root).ok()?;
99    store
100        .has_state()
101        .await
102        .then(|| store.display_root().to_string())
103}
104
105/// Resolve a graph's **storage URI** (`<root>/graphs/<id>.omni`) from a cluster's
106/// applied state ledger — the lightweight path for storage-plane maintenance
107/// (`optimize`/`repair`/`cleanup`).
108///
109/// Unlike [`read_serving_snapshot`], this deliberately does NOT validate catalog
110/// payloads or recovery readiness: maintenance only needs the derivable graph
111/// root, and must not be blocked by an unrelated corrupt policy/query blob or a
112/// pending recovery sweep — a degraded cluster is exactly when an operator
113/// reaches for `repair`. It reads the state ledger, confirms the graph is in the
114/// applied revision, and returns `graph_root(id)`.
115///
116/// `cluster` is a config directory or a storage-root URI (`s3://…`, config-free),
117/// mirroring the server's `--cluster` dispatch.
118pub async fn resolve_graph_storage_uri(cluster: &str, graph_id: &str) -> Result<String, Diagnostic> {
119    let backend = open_cluster_backend(cluster)?;
120    let mut observations = backend.observations();
121    let snapshot = backend.read_state(&mut observations).await?;
122    let state = snapshot.state.ok_or_else(|| missing_state_diagnostic(cluster))?;
123    let address = format!("graph.{graph_id}");
124    if !state.applied_revision.resources.contains_key(&address) {
125        let applied = applied_graph_ids(&state);
126        return Err(Diagnostic::error(
127            "graph_not_applied",
128            address,
129            format!(
130                "graph `{graph_id}` is not applied in cluster `{cluster}` (applied graphs: [{}]); \
131                 declare it in cluster.yaml and run `cluster apply`, or check the id",
132                applied.join(", ")
133            ),
134        ));
135    }
136    Ok(backend.graph_root(graph_id))
137}
138
139/// List the graph ids applied in a cluster's served state (sorted). Reads the
140/// ledger only — no catalog validation — like `resolve_graph_storage_uri`, so
141/// it works on a degraded cluster. Used to enumerate candidates when no
142/// `--graph` is selected (RFC-011 Decision 7).
143pub async fn cluster_graph_ids(cluster: &str) -> Result<Vec<String>, Diagnostic> {
144    let backend = open_cluster_backend(cluster)?;
145    let mut observations = backend.observations();
146    let snapshot = backend.read_state(&mut observations).await?;
147    let state = snapshot.state.ok_or_else(|| missing_state_diagnostic(cluster))?;
148    Ok(applied_graph_ids(&state))
149}
150
151fn open_cluster_backend(cluster: &str) -> Result<ClusterStore, Diagnostic> {
152    if cluster.contains("://") {
153        ClusterStore::for_storage_root(cluster)
154    } else {
155        Ok(ClusterStore::for_config_dir(Path::new(cluster)))
156    }
157}
158
159fn missing_state_diagnostic(cluster: &str) -> Diagnostic {
160    Diagnostic::error(
161        "cluster_state_missing",
162        CLUSTER_STATE_FILE,
163        format!("cluster `{cluster}` has no applied state; run `cluster apply` first"),
164    )
165}
166
167fn applied_graph_ids(state: &crate::types::ClusterState) -> Vec<String> {
168    let mut ids: Vec<String> = state
169        .applied_revision
170        .resources
171        .keys()
172        .filter_map(|a| a.strip_prefix("graph."))
173        .map(str::to_string)
174        .collect();
175    ids.sort();
176    ids
177}
178
179/// Split `<root>/graphs/<id>.omni` → `<root>`, gating on the exact cluster
180/// graph-layout shape (a single `<id>` segment, no nested path). `None` for
181/// anything else — no I/O is done for non-cluster-shaped URIs.
182fn cluster_root_of_graph_layout(graph_uri: &str) -> Option<String> {
183    let trimmed = graph_uri.trim_end_matches('/');
184    let rest = trimmed.strip_suffix(".omni")?;
185    let (root, id) = rest.rsplit_once("/graphs/")?;
186    if root.is_empty() || id.is_empty() || id.contains('/') {
187        return None;
188    }
189    Some(root.to_string())
190}
191
192async fn read_snapshot_with_store(
193    backend: ClusterStore,
194) -> Result<ServingSnapshot, Vec<Diagnostic>> {
195    let mut diagnostics: Vec<Diagnostic> = Vec::new();
196    let mut startup_diagnostics: Vec<Diagnostic> = Vec::new();
197    let mut quarantined_graphs: BTreeSet<String> = BTreeSet::new();
198
199    // Do not sweep at serve time. Valid graph-attributed sidecars quarantine
200    // that graph; malformed/unattributable sidecars remain cluster-fatal
201    // because serving cannot prove their blast radius.
202    let sidecar_diag_start = diagnostics.len();
203    let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
204    // Every diagnostic `list_recovery_sidecars` appends is a genuine
205    // read/parse/version failure (emitted as a warning by `store::list_json_dir`)
206    // whose blast radius serving cannot prove — promote each to a cluster-fatal
207    // error. This depends on that listing only ever emitting failure diagnostics;
208    // if it grows a benign/informational one, promote by code instead.
209    for diagnostic in diagnostics.iter_mut().skip(sidecar_diag_start) {
210        diagnostic.severity = DiagnosticSeverity::Error;
211    }
212    for (path, sidecar) in sidecars {
213        if sidecar.graph_id.trim().is_empty() {
214            diagnostics.push(Diagnostic::error(
215                "cluster_recovery_unattributed",
216                path,
217                "recovery sidecar has no graph id; run a state-mutating cluster command to sweep it before serving",
218            ));
219            continue;
220        }
221        quarantined_graphs.insert(sidecar.graph_id.clone());
222        startup_diagnostics.push(Diagnostic::warning(
223            "cluster_recovery_pending",
224            graph_address(&sidecar.graph_id),
225            format!(
226                "graph `{}` is quarantined because interrupted operation `{}` awaits recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep",
227                sidecar.graph_id, sidecar.operation_id
228            ),
229        ));
230    }
231    if has_errors(&diagnostics) {
232        return Err(diagnostics);
233    }
234
235    let mut observations = backend.observations();
236    let state = match backend.read_state(&mut observations).await {
237        Ok(snapshot) => match snapshot.state {
238            Some(state) => Some(state),
239            None => {
240                diagnostics.push(Diagnostic::error(
241                    "cluster_state_missing",
242                    CLUSTER_STATE_FILE,
243                    "no cluster state ledger; run `cluster import` and `cluster apply` first",
244                ));
245                None
246            }
247        },
248        Err(diagnostic) => {
249            diagnostics.push(diagnostic);
250            None
251        }
252    };
253    let Some(state) = state else {
254        diagnostics.extend(startup_diagnostics);
255        return Err(diagnostics);
256    };
257
258    let required_embedding_providers: BTreeSet<String> = state
259        .applied_revision
260        .resources
261        .iter()
262        .filter_map(|(address, entry)| match resource_kind(address) {
263            ResourceKind::Graph(graph_id) if !quarantined_graphs.contains(&graph_id) => {
264                entry.embedding_provider.clone()
265            }
266            _ => None,
267        })
268        .collect();
269    let mut embedding_profiles: BTreeMap<String, EmbeddingProviderConfig> = BTreeMap::new();
270    for (address, entry) in &state.applied_revision.resources {
271        if !matches!(resource_kind(address), ResourceKind::EmbeddingProvider(_)) {
272            continue;
273        }
274        if !required_embedding_providers.contains(address) {
275            continue;
276        }
277        let Some(profile) = entry.embedding_profile.clone() else {
278            diagnostics.push(Diagnostic::error(
279                "embedding_provider_profile_missing",
280                address.clone(),
281                "no applied embedding provider profile recorded; re-run `cluster apply` to backfill",
282            ));
283            continue;
284        };
285        let actual_digest = embedding_provider_digest(&profile);
286        if actual_digest != entry.digest {
287            diagnostics.push(Diagnostic::error(
288                "embedding_provider_digest_mismatch",
289                address.clone(),
290                format!(
291                    "applied embedding provider profile does not match its recorded digest (actual sha256:{actual_digest}); run `cluster refresh` then `cluster apply`, and restart"
292                ),
293            ));
294            continue;
295        }
296        embedding_profiles.insert(address.clone(), profile);
297    }
298
299    let mut graphs = Vec::new();
300    let mut queries = Vec::new();
301    let mut policies = Vec::new();
302    let mut saw_applied_graph = false;
303    for (address, entry) in &state.applied_revision.resources {
304        match resource_kind(address) {
305            ResourceKind::Graph(graph_id) => {
306                saw_applied_graph = true;
307                if quarantined_graphs.contains(&graph_id) {
308                    continue;
309                }
310                let embedding = match entry.embedding_provider.as_deref() {
311                    Some(provider_address) => match resource_kind(provider_address) {
312                        ResourceKind::EmbeddingProvider(_) => {
313                            match embedding_profiles.get(provider_address) {
314                                Some(profile) => Some(profile.clone()),
315                                None => {
316                                    diagnostics.push(Diagnostic::error(
317                                        "embedding_provider_missing",
318                                        address.clone(),
319                                        format!(
320                                            "graph references `{provider_address}`, but no applied embedding provider profile is available; re-run `cluster apply`"
321                                        ),
322                                    ));
323                                    None
324                                }
325                            }
326                        }
327                        _ => {
328                            diagnostics.push(Diagnostic::error(
329                                "wrong_kind_reference",
330                                address.clone(),
331                                format!(
332                                    "graph embedding_provider expects `provider.embedding.<name>`, got `{provider_address}`"
333                                ),
334                            ));
335                            None
336                        }
337                    },
338                    None => None,
339                };
340                graphs.push(ServingGraph {
341                    root: PathBuf::from(backend.graph_root(&graph_id)),
342                    graph_id,
343                    embedding,
344                });
345            }
346            ResourceKind::Schema(_) => {}
347            kind @ ResourceKind::Query { .. } => {
348                let ResourceKind::Query { graph, name } = &kind else {
349                    unreachable!()
350                };
351                if quarantined_graphs.contains(graph) {
352                    continue;
353                }
354                match backend
355                    .read_verified_payload(&kind, &entry.digest, address)
356                    .await
357                {
358                    Ok(source) => queries.push(ServingQuery {
359                        graph_id: graph.clone(),
360                        name: name.clone(),
361                        source,
362                    }),
363                    Err(diagnostic) => diagnostics.push(diagnostic),
364                }
365            }
366            kind @ ResourceKind::Policy(_) => {
367                let ResourceKind::Policy(name) = &kind else {
368                    unreachable!()
369                };
370                let Some(applies_to) = entry.applies_to.clone() else {
371                    diagnostics.push(Diagnostic::error(
372                        "policy_bindings_missing",
373                        address.clone(),
374                        "no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill",
375                    ));
376                    continue;
377                };
378                let applies_to: Vec<String> = applies_to
379                    .into_iter()
380                    .filter(|binding| {
381                        binding
382                            .strip_prefix("graph.")
383                            .is_none_or(|graph| !quarantined_graphs.contains(graph))
384                    })
385                    .collect();
386                if applies_to.is_empty() {
387                    continue;
388                }
389                match backend
390                    .read_verified_payload(&kind, &entry.digest, address)
391                    .await
392                {
393                    Ok(source) => policies.push(ServingPolicy {
394                        name: name.clone(),
395                        source,
396                        applies_to,
397                    }),
398                    Err(diagnostic) => diagnostics.push(diagnostic),
399                }
400            }
401            ResourceKind::EmbeddingProvider(_) => {}
402            ResourceKind::Unknown => {}
403        }
404    }
405
406    if graphs.is_empty() {
407        if saw_applied_graph && !quarantined_graphs.is_empty() {
408            diagnostics.push(Diagnostic::error(
409                "cluster_no_healthy_graphs",
410                CLUSTER_RECOVERIES_DIR,
411                "all applied graphs are quarantined by pending recovery sidecars; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry",
412            ));
413        } else {
414            diagnostics.push(Diagnostic::error(
415                "cluster_empty",
416                CLUSTER_STATE_FILE,
417                "the applied revision records no graphs; apply a cluster with at least one graph before serving from it",
418            ));
419        }
420    }
421    if has_errors(&diagnostics) {
422        diagnostics.extend(startup_diagnostics);
423        return Err(diagnostics);
424    }
425    Ok(ServingSnapshot {
426        graphs,
427        queries,
428        policies,
429        diagnostics: startup_diagnostics,
430    })
431}
432
433#[cfg(test)]
434mod tests {
435    use super::*;
436
437    #[test]
438    fn graph_layout_gating_does_no_io_for_non_cluster_shapes() {
439        // Only `<root>/graphs/<id>.omni` matches; everything else is None.
440        assert_eq!(
441            cluster_root_of_graph_layout("/data/cluster/graphs/kb.omni").as_deref(),
442            Some("/data/cluster")
443        );
444        assert_eq!(
445            cluster_root_of_graph_layout("s3://bucket/prefix/graphs/kb.omni").as_deref(),
446            Some("s3://bucket/prefix")
447        );
448        assert_eq!(cluster_root_of_graph_layout("./kb.omni"), None);
449        assert_eq!(cluster_root_of_graph_layout("s3://bucket/kb.omni"), None);
450        // nested id under graphs/ is not the cluster layout
451        assert_eq!(cluster_root_of_graph_layout("/c/graphs/a/b.omni"), None);
452        // not a .omni graph
453        assert_eq!(cluster_root_of_graph_layout("/c/graphs/kb"), None);
454    }
455
456    #[tokio::test]
457    async fn cluster_root_detected_only_when_state_ledger_present() {
458        let temp = tempfile::tempdir().unwrap();
459        let root = temp.path();
460        std::fs::create_dir_all(root.join("graphs")).unwrap();
461        let graph_uri = format!("{}/graphs/kb.omni", root.to_string_lossy());
462
463        // No __cluster/state.json yet → not a cluster.
464        assert_eq!(cluster_root_for_graph_uri(&graph_uri).await, None);
465
466        // Lay down the state ledger → now it's a cluster-managed location.
467        std::fs::create_dir_all(root.join("__cluster")).unwrap();
468        std::fs::write(root.join(CLUSTER_STATE_FILE), "{}").unwrap();
469        let detected = cluster_root_for_graph_uri(&graph_uri).await;
470        assert!(detected.is_some(), "expected cluster root to be detected");
471
472        // A non-cluster-shaped target never probes and is always None.
473        assert_eq!(
474            cluster_root_for_graph_uri(&format!("{}/plain.omni", root.to_string_lossy())).await,
475            None
476        );
477    }
478}