1use super::*;
5
6#[derive(Debug, Clone)]
8pub struct ServingGraph {
9 pub graph_id: String,
10 pub root: PathBuf,
11 pub embedding: Option<EmbeddingProviderConfig>,
12}
13
14#[derive(Debug, Clone)]
16pub struct ServingQuery {
17 pub graph_id: String,
18 pub name: String,
19 pub source: String,
20}
21
22#[derive(Debug, Clone)]
25pub struct ServingPolicy {
26 pub name: String,
27 pub source: String,
31 pub applies_to: Vec<String>,
32}
33
34#[derive(Debug, Clone)]
36pub struct ServingSnapshot {
37 pub graphs: Vec<ServingGraph>,
38 pub queries: Vec<ServingQuery>,
39 pub policies: Vec<ServingPolicy>,
40 pub diagnostics: Vec<Diagnostic>,
41}
42
43pub async fn read_serving_snapshot(
51 config_dir: impl AsRef<Path>,
52) -> Result<ServingSnapshot, Vec<Diagnostic>> {
53 let config_dir = config_dir.as_ref().to_path_buf();
54 let parsed = parse_cluster_config(&config_dir);
57 let storage_root = parsed.raw.as_ref().and_then(|raw| {
58 raw.storage
59 .as_deref()
60 .map(str::trim)
61 .filter(|root| !root.is_empty())
62 .map(|root| root.trim_end_matches('/').to_string())
63 });
64 let backend = match storage_root.as_deref() {
65 Some(root) => match ClusterStore::for_storage_root(root) {
66 Ok(backend) => backend,
67 Err(diagnostic) => return Err(vec![diagnostic]),
68 },
69 None => ClusterStore::for_config_dir(&config_dir),
70 };
71 read_snapshot_with_store(backend).await
72}
73
74pub async fn read_serving_snapshot_from_storage(
79 storage_root: &str,
80) -> Result<ServingSnapshot, Vec<Diagnostic>> {
81 let backend =
82 ClusterStore::for_storage_root(storage_root).map_err(|diagnostic| vec![diagnostic])?;
83 read_snapshot_with_store(backend).await
84}
85
86pub async fn cluster_root_for_graph_uri(graph_uri: &str) -> Option<String> {
97 let root = cluster_root_of_graph_layout(graph_uri)?;
98 let store = ClusterStore::for_storage_root(&root).ok()?;
99 store
100 .has_state()
101 .await
102 .then(|| store.display_root().to_string())
103}
104
105pub async fn resolve_graph_storage_uri(cluster: &str, graph_id: &str) -> Result<String, Diagnostic> {
119 let backend = open_cluster_backend(cluster)?;
120 let mut observations = backend.observations();
121 let snapshot = backend.read_state(&mut observations).await?;
122 let state = snapshot.state.ok_or_else(|| missing_state_diagnostic(cluster))?;
123 let address = format!("graph.{graph_id}");
124 if !state.applied_revision.resources.contains_key(&address) {
125 let applied = applied_graph_ids(&state);
126 return Err(Diagnostic::error(
127 "graph_not_applied",
128 address,
129 format!(
130 "graph `{graph_id}` is not applied in cluster `{cluster}` (applied graphs: [{}]); \
131 declare it in cluster.yaml and run `cluster apply`, or check the id",
132 applied.join(", ")
133 ),
134 ));
135 }
136 Ok(backend.graph_root(graph_id))
137}
138
139pub async fn cluster_graph_ids(cluster: &str) -> Result<Vec<String>, Diagnostic> {
144 let backend = open_cluster_backend(cluster)?;
145 let mut observations = backend.observations();
146 let snapshot = backend.read_state(&mut observations).await?;
147 let state = snapshot.state.ok_or_else(|| missing_state_diagnostic(cluster))?;
148 Ok(applied_graph_ids(&state))
149}
150
151fn open_cluster_backend(cluster: &str) -> Result<ClusterStore, Diagnostic> {
152 if cluster.contains("://") {
153 ClusterStore::for_storage_root(cluster)
154 } else {
155 Ok(ClusterStore::for_config_dir(Path::new(cluster)))
156 }
157}
158
159fn missing_state_diagnostic(cluster: &str) -> Diagnostic {
160 Diagnostic::error(
161 "cluster_state_missing",
162 CLUSTER_STATE_FILE,
163 format!("cluster `{cluster}` has no applied state; run `cluster apply` first"),
164 )
165}
166
167fn applied_graph_ids(state: &crate::types::ClusterState) -> Vec<String> {
168 let mut ids: Vec<String> = state
169 .applied_revision
170 .resources
171 .keys()
172 .filter_map(|a| a.strip_prefix("graph."))
173 .map(str::to_string)
174 .collect();
175 ids.sort();
176 ids
177}
178
179fn cluster_root_of_graph_layout(graph_uri: &str) -> Option<String> {
183 let trimmed = graph_uri.trim_end_matches('/');
184 let rest = trimmed.strip_suffix(".omni")?;
185 let (root, id) = rest.rsplit_once("/graphs/")?;
186 if root.is_empty() || id.is_empty() || id.contains('/') {
187 return None;
188 }
189 Some(root.to_string())
190}
191
192async fn read_snapshot_with_store(
193 backend: ClusterStore,
194) -> Result<ServingSnapshot, Vec<Diagnostic>> {
195 let mut diagnostics: Vec<Diagnostic> = Vec::new();
196 let mut startup_diagnostics: Vec<Diagnostic> = Vec::new();
197 let mut quarantined_graphs: BTreeSet<String> = BTreeSet::new();
198
199 let sidecar_diag_start = diagnostics.len();
203 let sidecars = backend.list_recovery_sidecars(&mut diagnostics).await;
204 for diagnostic in diagnostics.iter_mut().skip(sidecar_diag_start) {
210 diagnostic.severity = DiagnosticSeverity::Error;
211 }
212 for (path, sidecar) in sidecars {
213 if sidecar.graph_id.trim().is_empty() {
214 diagnostics.push(Diagnostic::error(
215 "cluster_recovery_unattributed",
216 path,
217 "recovery sidecar has no graph id; run a state-mutating cluster command to sweep it before serving",
218 ));
219 continue;
220 }
221 quarantined_graphs.insert(sidecar.graph_id.clone());
222 startup_diagnostics.push(Diagnostic::warning(
223 "cluster_recovery_pending",
224 graph_address(&sidecar.graph_id),
225 format!(
226 "graph `{}` is quarantined because interrupted operation `{}` awaits recovery; run any state-mutating cluster command (e.g. `cluster apply`) to sweep",
227 sidecar.graph_id, sidecar.operation_id
228 ),
229 ));
230 }
231 if has_errors(&diagnostics) {
232 return Err(diagnostics);
233 }
234
235 let mut observations = backend.observations();
236 let state = match backend.read_state(&mut observations).await {
237 Ok(snapshot) => match snapshot.state {
238 Some(state) => Some(state),
239 None => {
240 diagnostics.push(Diagnostic::error(
241 "cluster_state_missing",
242 CLUSTER_STATE_FILE,
243 "no cluster state ledger; run `cluster import` and `cluster apply` first",
244 ));
245 None
246 }
247 },
248 Err(diagnostic) => {
249 diagnostics.push(diagnostic);
250 None
251 }
252 };
253 let Some(state) = state else {
254 diagnostics.extend(startup_diagnostics);
255 return Err(diagnostics);
256 };
257
258 let required_embedding_providers: BTreeSet<String> = state
259 .applied_revision
260 .resources
261 .iter()
262 .filter_map(|(address, entry)| match resource_kind(address) {
263 ResourceKind::Graph(graph_id) if !quarantined_graphs.contains(&graph_id) => {
264 entry.embedding_provider.clone()
265 }
266 _ => None,
267 })
268 .collect();
269 let mut embedding_profiles: BTreeMap<String, EmbeddingProviderConfig> = BTreeMap::new();
270 for (address, entry) in &state.applied_revision.resources {
271 if !matches!(resource_kind(address), ResourceKind::EmbeddingProvider(_)) {
272 continue;
273 }
274 if !required_embedding_providers.contains(address) {
275 continue;
276 }
277 let Some(profile) = entry.embedding_profile.clone() else {
278 diagnostics.push(Diagnostic::error(
279 "embedding_provider_profile_missing",
280 address.clone(),
281 "no applied embedding provider profile recorded; re-run `cluster apply` to backfill",
282 ));
283 continue;
284 };
285 let actual_digest = embedding_provider_digest(&profile);
286 if actual_digest != entry.digest {
287 diagnostics.push(Diagnostic::error(
288 "embedding_provider_digest_mismatch",
289 address.clone(),
290 format!(
291 "applied embedding provider profile does not match its recorded digest (actual sha256:{actual_digest}); run `cluster refresh` then `cluster apply`, and restart"
292 ),
293 ));
294 continue;
295 }
296 embedding_profiles.insert(address.clone(), profile);
297 }
298
299 let mut graphs = Vec::new();
300 let mut queries = Vec::new();
301 let mut policies = Vec::new();
302 let mut saw_applied_graph = false;
303 for (address, entry) in &state.applied_revision.resources {
304 match resource_kind(address) {
305 ResourceKind::Graph(graph_id) => {
306 saw_applied_graph = true;
307 if quarantined_graphs.contains(&graph_id) {
308 continue;
309 }
310 let embedding = match entry.embedding_provider.as_deref() {
311 Some(provider_address) => match resource_kind(provider_address) {
312 ResourceKind::EmbeddingProvider(_) => {
313 match embedding_profiles.get(provider_address) {
314 Some(profile) => Some(profile.clone()),
315 None => {
316 diagnostics.push(Diagnostic::error(
317 "embedding_provider_missing",
318 address.clone(),
319 format!(
320 "graph references `{provider_address}`, but no applied embedding provider profile is available; re-run `cluster apply`"
321 ),
322 ));
323 None
324 }
325 }
326 }
327 _ => {
328 diagnostics.push(Diagnostic::error(
329 "wrong_kind_reference",
330 address.clone(),
331 format!(
332 "graph embedding_provider expects `provider.embedding.<name>`, got `{provider_address}`"
333 ),
334 ));
335 None
336 }
337 },
338 None => None,
339 };
340 graphs.push(ServingGraph {
341 root: PathBuf::from(backend.graph_root(&graph_id)),
342 graph_id,
343 embedding,
344 });
345 }
346 ResourceKind::Schema(_) => {}
347 kind @ ResourceKind::Query { .. } => {
348 let ResourceKind::Query { graph, name } = &kind else {
349 unreachable!()
350 };
351 if quarantined_graphs.contains(graph) {
352 continue;
353 }
354 match backend
355 .read_verified_payload(&kind, &entry.digest, address)
356 .await
357 {
358 Ok(source) => queries.push(ServingQuery {
359 graph_id: graph.clone(),
360 name: name.clone(),
361 source,
362 }),
363 Err(diagnostic) => diagnostics.push(diagnostic),
364 }
365 }
366 kind @ ResourceKind::Policy(_) => {
367 let ResourceKind::Policy(name) = &kind else {
368 unreachable!()
369 };
370 let Some(applies_to) = entry.applies_to.clone() else {
371 diagnostics.push(Diagnostic::error(
372 "policy_bindings_missing",
373 address.clone(),
374 "no applied applies_to bindings recorded (ledger predates binding metadata); re-run `cluster apply` to backfill",
375 ));
376 continue;
377 };
378 let applies_to: Vec<String> = applies_to
379 .into_iter()
380 .filter(|binding| {
381 binding
382 .strip_prefix("graph.")
383 .is_none_or(|graph| !quarantined_graphs.contains(graph))
384 })
385 .collect();
386 if applies_to.is_empty() {
387 continue;
388 }
389 match backend
390 .read_verified_payload(&kind, &entry.digest, address)
391 .await
392 {
393 Ok(source) => policies.push(ServingPolicy {
394 name: name.clone(),
395 source,
396 applies_to,
397 }),
398 Err(diagnostic) => diagnostics.push(diagnostic),
399 }
400 }
401 ResourceKind::EmbeddingProvider(_) => {}
402 ResourceKind::Unknown => {}
403 }
404 }
405
406 if graphs.is_empty() {
407 if saw_applied_graph && !quarantined_graphs.is_empty() {
408 diagnostics.push(Diagnostic::error(
409 "cluster_no_healthy_graphs",
410 CLUSTER_RECOVERIES_DIR,
411 "all applied graphs are quarantined by pending recovery sidecars; run any state-mutating cluster command (e.g. `cluster apply`) to sweep, then retry",
412 ));
413 } else {
414 diagnostics.push(Diagnostic::error(
415 "cluster_empty",
416 CLUSTER_STATE_FILE,
417 "the applied revision records no graphs; apply a cluster with at least one graph before serving from it",
418 ));
419 }
420 }
421 if has_errors(&diagnostics) {
422 diagnostics.extend(startup_diagnostics);
423 return Err(diagnostics);
424 }
425 Ok(ServingSnapshot {
426 graphs,
427 queries,
428 policies,
429 diagnostics: startup_diagnostics,
430 })
431}
432
433#[cfg(test)]
434mod tests {
435 use super::*;
436
437 #[test]
438 fn graph_layout_gating_does_no_io_for_non_cluster_shapes() {
439 assert_eq!(
441 cluster_root_of_graph_layout("/data/cluster/graphs/kb.omni").as_deref(),
442 Some("/data/cluster")
443 );
444 assert_eq!(
445 cluster_root_of_graph_layout("s3://bucket/prefix/graphs/kb.omni").as_deref(),
446 Some("s3://bucket/prefix")
447 );
448 assert_eq!(cluster_root_of_graph_layout("./kb.omni"), None);
449 assert_eq!(cluster_root_of_graph_layout("s3://bucket/kb.omni"), None);
450 assert_eq!(cluster_root_of_graph_layout("/c/graphs/a/b.omni"), None);
452 assert_eq!(cluster_root_of_graph_layout("/c/graphs/kb"), None);
454 }
455
456 #[tokio::test]
457 async fn cluster_root_detected_only_when_state_ledger_present() {
458 let temp = tempfile::tempdir().unwrap();
459 let root = temp.path();
460 std::fs::create_dir_all(root.join("graphs")).unwrap();
461 let graph_uri = format!("{}/graphs/kb.omni", root.to_string_lossy());
462
463 assert_eq!(cluster_root_for_graph_uri(&graph_uri).await, None);
465
466 std::fs::create_dir_all(root.join("__cluster")).unwrap();
468 std::fs::write(root.join(CLUSTER_STATE_FILE), "{}").unwrap();
469 let detected = cluster_root_for_graph_uri(&graph_uri).await;
470 assert!(detected.is_some(), "expected cluster root to be detected");
471
472 assert_eq!(
474 cluster_root_for_graph_uri(&format!("{}/plain.omni", root.to_string_lossy())).await,
475 None
476 );
477 }
478}