1use super::*;
6
7pub(crate) async fn load_cluster_settings(
12 cluster_dir: &PathBuf,
13 cli_bind: Option<String>,
14 cli_allow_unauthenticated: bool,
15) -> Result<ServerConfig> {
16 let cluster_arg = cluster_dir.to_string_lossy();
23 let snapshot = if cluster_arg.contains("://") {
24 omnigraph_cluster::read_serving_snapshot_from_storage(cluster_arg.as_ref()).await
25 } else {
26 omnigraph_cluster::read_serving_snapshot(cluster_dir).await
27 }
28 .map_err(|diagnostics| {
29 let details = diagnostics
30 .iter()
31 .map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
32 .collect::<Vec<_>>()
33 .join("\n ");
34 eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
35 })?;
36
37 let mut server_policy: Option<PolicySource> = None;
41 let mut graph_policies: BTreeMap<String, PolicySource> = BTreeMap::new();
42 for policy in &snapshot.policies {
43 for binding in &policy.applies_to {
44 if binding == "cluster" {
45 if server_policy
46 .replace(PolicySource::Inline(policy.source.clone()))
47 .is_some()
48 {
49 bail!(
50 "multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
51 );
52 }
53 } else if let Some(graph_id) = binding.strip_prefix("graph.") {
54 if graph_policies
55 .insert(
56 graph_id.to_string(),
57 PolicySource::Inline(policy.source.clone()),
58 )
59 .is_some()
60 {
61 bail!(
62 "multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
63 );
64 }
65 } else {
66 bail!("unrecognized policy binding '{binding}' in the applied revision");
67 }
68 }
69 }
70
71 let mut graphs = Vec::new();
72 for graph in &snapshot.graphs {
73 let specs: Vec<queries::RegistrySpec> = snapshot
74 .queries
75 .iter()
76 .filter(|query| query.graph_id == graph.graph_id)
77 .map(|query| queries::RegistrySpec {
78 name: query.name.clone(),
79 source: query.source.clone(),
80 expose: true,
84 tool_name: None,
85 })
86 .collect();
87 let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
88 let details = errors
89 .iter()
90 .map(|error| error.to_string())
91 .collect::<Vec<_>>()
92 .join("\n ");
93 eyre!(
94 "stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
95 )
96 })?;
97 graphs.push(GraphStartupConfig {
98 graph_id: graph.graph_id.clone(),
99 uri: graph.root.to_string_lossy().to_string(),
100 policy: graph_policies.get(&graph.graph_id).cloned(),
101 embedding: graph
102 .embedding
103 .as_ref()
104 .map(|profile| {
105 profile.resolve().map_err(|err| {
106 eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
107 })
108 })
109 .transpose()?,
110 queries: registry,
111 });
112 }
113
114 let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
115 .ok()
116 .map(|v| {
117 let trimmed = v.trim();
118 !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
119 })
120 .unwrap_or(false);
121
122 Ok(ServerConfig {
123 mode: ServerConfigMode::Multi {
124 graphs,
125 config_path: cluster_dir.clone(),
126 server_policy,
127 },
128 bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
129 allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
130 })
131}
132
133pub async fn load_server_settings(
138 cli_cluster: Option<&PathBuf>,
139 cli_bind: Option<String>,
140 cli_allow_unauthenticated: bool,
141) -> Result<ServerConfig> {
142 let Some(cluster_dir) = cli_cluster else {
143 bail!(
144 "omnigraph-server boots from a cluster: pass --cluster <dir|s3://…> \
145 (the cluster's applied revision is the deployment artifact). The legacy \
146 single-graph boot (positional <URI>, --target, --config omnigraph.yaml) \
147 was removed in RFC-011."
148 );
149 };
150 load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await
151}
152
153#[derive(Debug, Clone, Copy, Eq, PartialEq)]
174pub enum ServerRuntimeState {
175 Open,
176 DefaultDeny,
177 PolicyEnabled,
178}
179
180pub fn classify_server_runtime_state(
191 has_tokens: bool,
192 has_policy: bool,
193 allow_unauthenticated: bool,
194) -> Result<ServerRuntimeState> {
195 match (has_tokens, has_policy, allow_unauthenticated) {
196 (false, false, false) => bail!(
197 "server has no bearer tokens and no policy file configured. This is a fully \
198 open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
199 if you actually want that, otherwise configure bearer tokens (see \
200 docs/user/operations/server.md) and a graph or cluster policy bundle in \
201 the cluster config, then run `omnigraph cluster apply` and restart."
202 ),
203 (false, false, true) => Ok(ServerRuntimeState::Open),
204 (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
205 (false, true, _) => bail!(
206 "policy file is configured but no bearer tokens — every request would 401 \
207 because no token can ever match. Configure at least one bearer token (see \
208 docs/user/operations/server.md), or remove the policy file. To deny all unauthenticated \
209 traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
210 produces meaningful 403s with policy-decision logging instead of silent 401s."
211 ),
212 (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
213 }
214}
215
216pub(crate) fn normalize_bearer_token(value: Option<String>) -> Option<String> {
217 value
218 .map(|value| value.trim().to_string())
219 .filter(|value| !value.is_empty())
220}
221
222pub(crate) fn normalize_bearer_actor(value: String) -> Result<String> {
223 let value = value.trim().to_string();
224 if value.is_empty() {
225 bail!("bearer token actor names must not be blank");
226 }
227 Ok(value)
228}
229
230pub(crate) fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
231 let entries: HashMap<String, String> = serde_json::from_str(value)
232 .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
233 Ok(entries.into_iter().collect())
234}
235
236pub(crate) fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
237 let contents = fs::read_to_string(path)
238 .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
239 parse_bearer_tokens_json(&contents)
240 .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
241}
242
243pub(crate) fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
244 let mut seen_actors = HashSet::new();
245 let mut seen_tokens = HashSet::new();
246 let mut normalized = Vec::with_capacity(entries.len());
247
248 for (actor, token) in entries {
249 let actor = normalize_bearer_actor(actor)?;
250 let Some(token) = normalize_bearer_token(Some(token)) else {
251 bail!("bearer token for actor '{actor}' must not be blank");
252 };
253 if !seen_actors.insert(actor.clone()) {
254 bail!("duplicate bearer token actor '{actor}'");
255 }
256 if !seen_tokens.insert(token.clone()) {
257 bail!("duplicate bearer token value configured");
258 }
259 normalized.push((actor, token));
260 }
261
262 normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
263 Ok(normalized)
264}
265
266pub(crate) fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
267 let mut entries = Vec::new();
268
269 if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
270 {
271 entries.push(("default".to_string(), token));
272 }
273
274 if let Some(path) =
275 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
276 {
277 entries.extend(read_bearer_tokens_file(&path)?);
278 } else if let Some(json) =
279 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
280 {
281 entries.extend(parse_bearer_tokens_json(&json)?);
282 }
283
284 validate_bearer_tokens(entries)
285}
286
287#[cfg(test)]
288mod tests {
289 use super::{
290 GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
291 classify_server_runtime_state, hash_bearer_token, normalize_bearer_token,
292 parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
293 };
294 use serial_test::serial;
295 use std::env;
296 use std::fs;
297 use tempfile::tempdir;
298
299 #[test]
303 fn authorize_splits_decision_from_operational_error() {
304 use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
305 use std::sync::Arc;
306
307 fn req(action: PolicyAction) -> PolicyRequest {
308 PolicyRequest { action, branch: None, target_branch: None }
309 }
310 let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
311
312 assert!(matches!(
315 authorize(Some(&actor), None, req(PolicyAction::GraphList)).unwrap(),
316 Authz::Denied(_)
317 ));
318 assert!(matches!(
320 authorize(Some(&actor), None, req(PolicyAction::Change)).unwrap(),
321 Authz::Denied(_)
322 ));
323 assert!(matches!(
325 authorize(Some(&actor), None, req(PolicyAction::Read)).unwrap(),
326 Authz::Allowed
327 ));
328 assert!(matches!(
330 authorize(None, None, req(PolicyAction::Read)).unwrap(),
331 Authz::Allowed
332 ));
333
334 let policy: PolicyConfig = serde_yaml::from_str(
336 "version: 1\n\
337 groups:\n team: [act-alice]\n\
338 rules:\n - id: team-read\n allow:\n actors: { group: team }\n actions: [read]\n branch_scope: any\n",
339 )
340 .unwrap();
341 let engine = PolicyCompiler::compile(&policy, "graph").unwrap();
342
343 assert!(matches!(
345 authorize(
346 Some(&actor),
347 Some(&engine),
348 PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
349 )
350 .unwrap(),
351 Authz::Allowed
352 ));
353 match authorize(
355 Some(&actor),
356 Some(&engine),
357 PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
358 )
359 .unwrap()
360 {
361 Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
362 Authz::Allowed => panic!("change must be denied: only read is allowed"),
363 }
364 assert!(
368 authorize(None, Some(&engine), req(PolicyAction::Read)).is_err(),
369 "a missing actor with a policy installed is an operational error, not a deny"
370 );
371 }
372
373 #[test]
374 fn hash_bearer_token_produces_32_byte_output() {
375 let hash = hash_bearer_token("any-token");
376 assert_eq!(hash.len(), 32);
377 }
378
379 #[test]
385 fn validate_and_attach_gates_on_schema_and_collapses_empty() {
386 use crate::queries::{QueryRegistry, RegistrySpec};
387 use omnigraph_compiler::catalog::build_catalog;
388 use omnigraph_compiler::schema::parser::parse_schema;
389
390 let schema = parse_schema("node User {\nname: String\n}\n").unwrap();
391 let catalog = build_catalog(&schema).unwrap();
392 let spec = |name: &str, source: &str| RegistrySpec {
393 name: name.to_string(),
394 source: source.to_string(),
395 expose: false,
396 tool_name: None,
397 };
398
399 let empty =
401 super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
402 assert!(empty.is_none());
403
404 let ok = QueryRegistry::from_specs(vec![spec(
406 "find_user",
407 "query find_user() { match { $u: User } return { $u.name } }",
408 )])
409 .unwrap();
410 assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
411
412 let broken = QueryRegistry::from_specs(vec![spec(
415 "ghost",
416 "query ghost() { match { $w: Widget } return { $w.name } }",
417 )])
418 .unwrap();
419 let err = super::validate_and_attach(broken, &catalog, "graph-x").unwrap_err();
420 let msg = err.to_string();
421 assert!(msg.contains("graph-x"), "labels the graph: {msg}");
422 assert!(msg.contains("ghost"), "names the query: {msg}");
423 assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
424 }
425
426 #[test]
427 fn hash_bearer_token_is_deterministic() {
428 assert_eq!(
429 hash_bearer_token("stable-input"),
430 hash_bearer_token("stable-input"),
431 );
432 }
433
434 #[test]
435 fn hash_bearer_token_differs_for_different_inputs() {
436 assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
437 }
438
439 #[test]
440 fn hash_bearer_token_matches_known_sha256_vector() {
441 let hash = hash_bearer_token("abc");
443 let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
444 assert_eq!(
445 hex,
446 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
447 );
448 }
449
450 #[tokio::test]
451 async fn server_settings_require_cluster_boot_source() {
452 let error = super::load_server_settings(None, None, false)
455 .await
456 .unwrap_err();
457 assert!(
458 error.to_string().contains("boots from a cluster"),
459 "expected cluster-required error, got: {error}",
460 );
461 }
462
463 #[test]
464 fn classify_open_requires_explicit_unauthenticated_flag() {
465 let error = classify_server_runtime_state(false, false, false).unwrap_err();
467 let msg = error.to_string();
468 assert!(
469 msg.contains("--unauthenticated"),
470 "expected refusal message mentioning --unauthenticated, got: {msg}"
471 );
472
473 assert_eq!(
475 classify_server_runtime_state(false, false, true).unwrap(),
476 ServerRuntimeState::Open
477 );
478 }
479
480 #[test]
481 fn classify_tokens_without_policy_is_default_deny() {
482 assert_eq!(
486 classify_server_runtime_state(true, false, false).unwrap(),
487 ServerRuntimeState::DefaultDeny
488 );
489 assert_eq!(
490 classify_server_runtime_state(true, false, true).unwrap(),
491 ServerRuntimeState::DefaultDeny
492 );
493 }
494
495 #[tokio::test]
496 #[serial]
497 async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
498 let _guard = EnvGuard::set(&[
507 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
508 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
509 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
510 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
511 ("OMNIGRAPH_UNAUTHENTICATED", None),
512 ]);
513 let temp = tempdir().unwrap();
514 let policy_path = temp.path().join("server-policy.yaml");
519 let config = ServerConfig {
520 mode: ServerConfigMode::Multi {
521 graphs: vec![GraphStartupConfig {
522 graph_id: "alpha".to_string(),
523 uri: temp
524 .path()
525 .join("alpha.omni")
526 .to_string_lossy()
527 .into_owned(),
528 policy: None,
529 embedding: None,
530 queries: crate::queries::QueryRegistry::default(),
531 }],
532 config_path: temp.path().join("omnigraph.yaml"),
533 server_policy: Some(crate::PolicySource::File(policy_path)),
534 },
535 bind: "127.0.0.1:0".to_string(),
536 allow_unauthenticated: false,
537 };
538 let result = serve(config).await;
539 let err = result
540 .expect_err("serve should refuse to start in multi mode with policy but no tokens");
541 let msg = format!("{:?}", err);
542 assert!(
543 msg.contains("policy file is configured but no bearer tokens"),
544 "expected policy-without-tokens rejection in multi mode, got: {msg}",
545 );
546 }
547
548 #[tokio::test]
549 #[serial]
550 async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
551 let _guard = EnvGuard::set(&[
562 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
563 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
564 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
565 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
566 ("OMNIGRAPH_UNAUTHENTICATED", None),
567 ]);
568 let temp = tempdir().unwrap();
569 let config = ServerConfig {
572 mode: ServerConfigMode::Multi {
573 graphs: vec![GraphStartupConfig {
574 graph_id: "default".to_string(),
575 uri: temp
576 .path()
577 .join("graph.omni")
578 .to_string_lossy()
579 .into_owned(),
580 policy: None,
581 embedding: None,
582 queries: crate::queries::QueryRegistry::default(),
583 }],
584 config_path: temp.path().join("cluster"),
585 server_policy: None,
586 },
587 bind: "127.0.0.1:0".to_string(),
588 allow_unauthenticated: false,
589 };
590 let result = serve(config).await;
591 let err =
592 result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
593 let msg = format!("{:?}", err);
594 assert!(
595 msg.contains("no bearer tokens") || msg.contains("policy file"),
596 "expected refusal message naming the misconfiguration, got: {msg}",
597 );
598 }
599
600 #[test]
601 fn classify_policy_enabled_requires_tokens() {
602 assert_eq!(
606 classify_server_runtime_state(true, true, false).unwrap(),
607 ServerRuntimeState::PolicyEnabled
608 );
609 assert_eq!(
610 classify_server_runtime_state(true, true, true).unwrap(),
611 ServerRuntimeState::PolicyEnabled
612 );
613 }
614
615 #[test]
616 fn classify_policy_without_tokens_is_rejected() {
617 for allow_unauthenticated in [false, true] {
624 let err =
625 classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
626 let msg = err.to_string();
627 assert!(
628 msg.contains("policy file is configured but no bearer tokens"),
629 "expected policy-without-tokens rejection message; got: {msg}"
630 );
631 assert!(
632 msg.contains("every request would 401"),
633 "rejection message must name the failure mode; got: {msg}"
634 );
635 }
636 }
637
638 #[test]
639 fn normalize_bearer_token_trims_and_filters_blank_values() {
640 assert_eq!(normalize_bearer_token(None), None);
641 assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
642 assert_eq!(
643 normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
644 Some("demo-token")
645 );
646 }
647
648 struct EnvGuard {
649 saved: Vec<(&'static str, Option<String>)>,
650 }
651
652 impl EnvGuard {
653 fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
654 let saved = vars
655 .iter()
656 .map(|(name, _)| (*name, env::var(name).ok()))
657 .collect::<Vec<_>>();
658 for (name, value) in vars {
659 unsafe {
660 match value {
661 Some(value) => env::set_var(name, value),
662 None => env::remove_var(name),
663 }
664 }
665 }
666 Self { saved }
667 }
668 }
669
670 impl Drop for EnvGuard {
671 fn drop(&mut self) {
672 for (name, value) in self.saved.drain(..) {
673 unsafe {
674 match value {
675 Some(value) => env::set_var(name, value),
676 None => env::remove_var(name),
677 }
678 }
679 }
680 }
681 }
682
683 #[test]
684 fn parse_bearer_tokens_json_reads_actor_token_map() {
685 let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
686 assert_eq!(tokens.len(), 2);
687 assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
688 assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
689 }
690
691 #[test]
692 #[serial]
693 fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
694 let temp = tempdir().unwrap();
695 let tokens_path = temp.path().join("tokens.json");
696 fs::write(
697 &tokens_path,
698 r#"{"team-01":"token-one","team-02":"token-two"}"#,
699 )
700 .unwrap();
701
702 let _guard = EnvGuard::set(&[
703 ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
704 (
705 "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
706 Some(tokens_path.to_str().unwrap()),
707 ),
708 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
709 ]);
710
711 let tokens = server_bearer_tokens_from_env().unwrap();
712 assert_eq!(
713 tokens,
714 vec![
715 ("default".to_string(), "legacy-token".to_string()),
716 ("team-01".to_string(), "token-one".to_string()),
717 ("team-02".to_string(), "token-two".to_string()),
718 ]
719 );
720 }
721}