use super::*;
pub(crate) async fn load_cluster_settings(
cluster_dir: &PathBuf,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let cluster_arg = cluster_dir.to_string_lossy();
let snapshot = if cluster_arg.contains("://") {
omnigraph_cluster::read_serving_snapshot_from_storage(cluster_arg.as_ref()).await
} else {
omnigraph_cluster::read_serving_snapshot(cluster_dir).await
}
.map_err(|diagnostics| {
let details = diagnostics
.iter()
.map(|diagnostic| format!("[{}] {}: {}", diagnostic.code, diagnostic.path, diagnostic.message))
.collect::<Vec<_>>()
.join("\n ");
eyre!("the cluster at '{}' is not ready to serve:\n {details}", cluster_dir.display())
})?;
let mut server_policy: Option<PolicySource> = None;
let mut graph_policies: BTreeMap<String, PolicySource> = BTreeMap::new();
for policy in &snapshot.policies {
for binding in &policy.applies_to {
if binding == "cluster" {
if server_policy
.replace(PolicySource::Inline(policy.source.clone()))
.is_some()
{
bail!(
"multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
);
}
} else if let Some(graph_id) = binding.strip_prefix("graph.") {
if graph_policies
.insert(
graph_id.to_string(),
PolicySource::Inline(policy.source.clone()),
)
.is_some()
{
bail!(
"multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
);
}
} else {
bail!("unrecognized policy binding '{binding}' in the applied revision");
}
}
}
let mut graphs = Vec::new();
for graph in &snapshot.graphs {
let specs: Vec<queries::RegistrySpec> = snapshot
.queries
.iter()
.filter(|query| query.graph_id == graph.graph_id)
.map(|query| queries::RegistrySpec {
name: query.name.clone(),
source: query.source.clone(),
expose: true,
tool_name: None,
})
.collect();
let registry = QueryRegistry::from_specs(specs).map_err(|errors| {
let details = errors
.iter()
.map(|error| error.to_string())
.collect::<Vec<_>>()
.join("\n ");
eyre!(
"stored queries in the applied revision failed to parse:\n {details}\nrun `cluster refresh` then `cluster apply`, and restart"
)
})?;
graphs.push(GraphStartupConfig {
graph_id: graph.graph_id.clone(),
uri: graph.root.to_string_lossy().to_string(),
policy: graph_policies.get(&graph.graph_id).cloned(),
embedding: graph
.embedding
.as_ref()
.map(|profile| {
profile.resolve().map_err(|err| {
eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
})
})
.transpose()?,
queries: registry,
});
}
let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
.ok()
.map(|v| {
let trimmed = v.trim();
!trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
})
.unwrap_or(false);
Ok(ServerConfig {
mode: ServerConfigMode::Multi {
graphs,
config_path: cluster_dir.clone(),
server_policy,
},
bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
})
}
pub async fn load_server_settings(
cli_cluster: Option<&PathBuf>,
cli_bind: Option<String>,
cli_allow_unauthenticated: bool,
) -> Result<ServerConfig> {
let Some(cluster_dir) = cli_cluster else {
bail!(
"omnigraph-server boots from a cluster: pass --cluster <dir|s3://…> \
(the cluster's applied revision is the deployment artifact). The legacy \
single-graph boot (positional <URI>, --target, --config omnigraph.yaml) \
was removed in RFC-011."
);
};
load_cluster_settings(cluster_dir, cli_bind, cli_allow_unauthenticated).await
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ServerRuntimeState {
Open,
DefaultDeny,
PolicyEnabled,
}
pub fn classify_server_runtime_state(
has_tokens: bool,
has_policy: bool,
allow_unauthenticated: bool,
) -> Result<ServerRuntimeState> {
match (has_tokens, has_policy, allow_unauthenticated) {
(false, false, false) => bail!(
"server has no bearer tokens and no policy file configured. This is a fully \
open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
if you actually want that, otherwise configure bearer tokens (see \
docs/user/operations/server.md) and a graph or cluster policy bundle in \
the cluster config, then run `omnigraph cluster apply` and restart."
),
(false, false, true) => Ok(ServerRuntimeState::Open),
(true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
(false, true, _) => bail!(
"policy file is configured but no bearer tokens — every request would 401 \
because no token can ever match. Configure at least one bearer token (see \
docs/user/operations/server.md), or remove the policy file. To deny all unauthenticated \
traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
produces meaningful 403s with policy-decision logging instead of silent 401s."
),
(true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
}
}
pub(crate) fn normalize_bearer_token(value: Option<String>) -> Option<String> {
value
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
pub(crate) fn normalize_bearer_actor(value: String) -> Result<String> {
let value = value.trim().to_string();
if value.is_empty() {
bail!("bearer token actor names must not be blank");
}
Ok(value)
}
pub(crate) fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
let entries: HashMap<String, String> = serde_json::from_str(value)
.wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
Ok(entries.into_iter().collect())
}
pub(crate) fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
let contents = fs::read_to_string(path)
.wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
parse_bearer_tokens_json(&contents)
.wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
}
pub(crate) fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
let mut seen_actors = HashSet::new();
let mut seen_tokens = HashSet::new();
let mut normalized = Vec::with_capacity(entries.len());
for (actor, token) in entries {
let actor = normalize_bearer_actor(actor)?;
let Some(token) = normalize_bearer_token(Some(token)) else {
bail!("bearer token for actor '{actor}' must not be blank");
};
if !seen_actors.insert(actor.clone()) {
bail!("duplicate bearer token actor '{actor}'");
}
if !seen_tokens.insert(token.clone()) {
bail!("duplicate bearer token value configured");
}
normalized.push((actor, token));
}
normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
Ok(normalized)
}
pub(crate) fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
let mut entries = Vec::new();
if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
{
entries.push(("default".to_string(), token));
}
if let Some(path) =
normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
{
entries.extend(read_bearer_tokens_file(&path)?);
} else if let Some(json) =
normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
{
entries.extend(parse_bearer_tokens_json(&json)?);
}
validate_bearer_tokens(entries)
}
#[cfg(test)]
mod tests {
use super::{
GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
classify_server_runtime_state, hash_bearer_token, normalize_bearer_token,
parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
};
use serial_test::serial;
use std::env;
use std::fs;
use tempfile::tempdir;
#[test]
fn authorize_splits_decision_from_operational_error() {
use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
use std::sync::Arc;
fn req(action: PolicyAction) -> PolicyRequest {
PolicyRequest { action, branch: None, target_branch: None }
}
let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
assert!(matches!(
authorize(Some(&actor), None, req(PolicyAction::GraphList)).unwrap(),
Authz::Denied(_)
));
assert!(matches!(
authorize(Some(&actor), None, req(PolicyAction::Change)).unwrap(),
Authz::Denied(_)
));
assert!(matches!(
authorize(Some(&actor), None, req(PolicyAction::Read)).unwrap(),
Authz::Allowed
));
assert!(matches!(
authorize(None, None, req(PolicyAction::Read)).unwrap(),
Authz::Allowed
));
let policy: PolicyConfig = serde_yaml::from_str(
"version: 1\n\
groups:\n team: [act-alice]\n\
rules:\n - id: team-read\n allow:\n actors: { group: team }\n actions: [read]\n branch_scope: any\n",
)
.unwrap();
let engine = PolicyCompiler::compile(&policy, "graph").unwrap();
assert!(matches!(
authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
)
.unwrap(),
Authz::Allowed
));
match authorize(
Some(&actor),
Some(&engine),
PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
)
.unwrap()
{
Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
Authz::Allowed => panic!("change must be denied: only read is allowed"),
}
assert!(
authorize(None, Some(&engine), req(PolicyAction::Read)).is_err(),
"a missing actor with a policy installed is an operational error, not a deny"
);
}
#[test]
fn hash_bearer_token_produces_32_byte_output() {
let hash = hash_bearer_token("any-token");
assert_eq!(hash.len(), 32);
}
#[test]
fn validate_and_attach_gates_on_schema_and_collapses_empty() {
use crate::queries::{QueryRegistry, RegistrySpec};
use omnigraph_compiler::catalog::build_catalog;
use omnigraph_compiler::schema::parser::parse_schema;
let schema = parse_schema("node User {\nname: String\n}\n").unwrap();
let catalog = build_catalog(&schema).unwrap();
let spec = |name: &str, source: &str| RegistrySpec {
name: name.to_string(),
source: source.to_string(),
expose: false,
tool_name: None,
};
let empty =
super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
assert!(empty.is_none());
let ok = QueryRegistry::from_specs(vec![spec(
"find_user",
"query find_user() { match { $u: User } return { $u.name } }",
)])
.unwrap();
assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
let broken = QueryRegistry::from_specs(vec![spec(
"ghost",
"query ghost() { match { $w: Widget } return { $w.name } }",
)])
.unwrap();
let err = super::validate_and_attach(broken, &catalog, "graph-x").unwrap_err();
let msg = err.to_string();
assert!(msg.contains("graph-x"), "labels the graph: {msg}");
assert!(msg.contains("ghost"), "names the query: {msg}");
assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
}
#[test]
fn hash_bearer_token_is_deterministic() {
assert_eq!(
hash_bearer_token("stable-input"),
hash_bearer_token("stable-input"),
);
}
#[test]
fn hash_bearer_token_differs_for_different_inputs() {
assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
}
#[test]
fn hash_bearer_token_matches_known_sha256_vector() {
let hash = hash_bearer_token("abc");
let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
assert_eq!(
hex,
"ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
);
}
#[tokio::test]
async fn server_settings_require_cluster_boot_source() {
let error = super::load_server_settings(None, None, false)
.await
.unwrap_err();
assert!(
error.to_string().contains("boots from a cluster"),
"expected cluster-required error, got: {error}",
);
}
#[test]
fn classify_open_requires_explicit_unauthenticated_flag() {
let error = classify_server_runtime_state(false, false, false).unwrap_err();
let msg = error.to_string();
assert!(
msg.contains("--unauthenticated"),
"expected refusal message mentioning --unauthenticated, got: {msg}"
);
assert_eq!(
classify_server_runtime_state(false, false, true).unwrap(),
ServerRuntimeState::Open
);
}
#[test]
fn classify_tokens_without_policy_is_default_deny() {
assert_eq!(
classify_server_runtime_state(true, false, false).unwrap(),
ServerRuntimeState::DefaultDeny
);
assert_eq!(
classify_server_runtime_state(true, false, true).unwrap(),
ServerRuntimeState::DefaultDeny
);
}
#[tokio::test]
#[serial]
async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
let _guard = EnvGuard::set(&[
("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
("OMNIGRAPH_UNAUTHENTICATED", None),
]);
let temp = tempdir().unwrap();
let policy_path = temp.path().join("server-policy.yaml");
let config = ServerConfig {
mode: ServerConfigMode::Multi {
graphs: vec![GraphStartupConfig {
graph_id: "alpha".to_string(),
uri: temp
.path()
.join("alpha.omni")
.to_string_lossy()
.into_owned(),
policy: None,
embedding: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("omnigraph.yaml"),
server_policy: Some(crate::PolicySource::File(policy_path)),
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
};
let result = serve(config).await;
let err = result
.expect_err("serve should refuse to start in multi mode with policy but no tokens");
let msg = format!("{:?}", err);
assert!(
msg.contains("policy file is configured but no bearer tokens"),
"expected policy-without-tokens rejection in multi mode, got: {msg}",
);
}
#[tokio::test]
#[serial]
async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
let _guard = EnvGuard::set(&[
("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
("OMNIGRAPH_UNAUTHENTICATED", None),
]);
let temp = tempdir().unwrap();
let config = ServerConfig {
mode: ServerConfigMode::Multi {
graphs: vec![GraphStartupConfig {
graph_id: "default".to_string(),
uri: temp
.path()
.join("graph.omni")
.to_string_lossy()
.into_owned(),
policy: None,
embedding: None,
queries: crate::queries::QueryRegistry::default(),
}],
config_path: temp.path().join("cluster"),
server_policy: None,
},
bind: "127.0.0.1:0".to_string(),
allow_unauthenticated: false,
};
let result = serve(config).await;
let err =
result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
let msg = format!("{:?}", err);
assert!(
msg.contains("no bearer tokens") || msg.contains("policy file"),
"expected refusal message naming the misconfiguration, got: {msg}",
);
}
#[test]
fn classify_policy_enabled_requires_tokens() {
assert_eq!(
classify_server_runtime_state(true, true, false).unwrap(),
ServerRuntimeState::PolicyEnabled
);
assert_eq!(
classify_server_runtime_state(true, true, true).unwrap(),
ServerRuntimeState::PolicyEnabled
);
}
#[test]
fn classify_policy_without_tokens_is_rejected() {
for allow_unauthenticated in [false, true] {
let err =
classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("policy file is configured but no bearer tokens"),
"expected policy-without-tokens rejection message; got: {msg}"
);
assert!(
msg.contains("every request would 401"),
"rejection message must name the failure mode; got: {msg}"
);
}
}
#[test]
fn normalize_bearer_token_trims_and_filters_blank_values() {
assert_eq!(normalize_bearer_token(None), None);
assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
assert_eq!(
normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
Some("demo-token")
);
}
struct EnvGuard {
saved: Vec<(&'static str, Option<String>)>,
}
impl EnvGuard {
fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
let saved = vars
.iter()
.map(|(name, _)| (*name, env::var(name).ok()))
.collect::<Vec<_>>();
for (name, value) in vars {
unsafe {
match value {
Some(value) => env::set_var(name, value),
None => env::remove_var(name),
}
}
}
Self { saved }
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
for (name, value) in self.saved.drain(..) {
unsafe {
match value {
Some(value) => env::set_var(name, value),
None => env::remove_var(name),
}
}
}
}
}
#[test]
fn parse_bearer_tokens_json_reads_actor_token_map() {
let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
assert_eq!(tokens.len(), 2);
assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
}
#[test]
#[serial]
fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
let temp = tempdir().unwrap();
let tokens_path = temp.path().join("tokens.json");
fs::write(
&tokens_path,
r#"{"team-01":"token-one","team-02":"token-two"}"#,
)
.unwrap();
let _guard = EnvGuard::set(&[
("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
(
"OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
Some(tokens_path.to_str().unwrap()),
),
("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
]);
let tokens = server_bearer_tokens_from_env().unwrap();
assert_eq!(
tokens,
vec![
("default".to_string(), "legacy-token".to_string()),
("team-01".to_string(), "token-one".to_string()),
("team-02".to_string(), "token-two".to_string()),
]
);
}
}