Skip to main content

omnigraph_server/
settings.rs

1//! Server settings: cluster/CLI/env resolution, bearer-token sources, and
2//! runtime-state classification (moved verbatim from lib.rs in the
3//! modularization).
4
5use super::*;
6
7/// Build serving settings from a cluster directory's applied revision
8/// (RFC-005 §D2): graphs at derived roots, stored queries from verified
9/// catalog blob content, policy bundles from blob paths with their applied
10/// bindings. Always multi-graph routing.
11pub(crate) async fn load_cluster_settings(
12    cluster_dir: &PathBuf,
13    cli_bind: Option<String>,
14    cli_allow_unauthenticated: bool,
15    cli_require_all_graphs: bool,
16) -> Result<ServerConfig> {
17    // `--cluster` accepts either a config directory (the ledger location is
18    // resolved through cluster.yaml's `storage:` key) or a storage-root URI
19    // directly (`s3://bucket/prefix`) — config-free serving: the ledger and
20    // catalog on the bucket ARE the deployment artifact.
21    // Any scheme-qualified argument (s3://, file://) is a storage root; a
22    // bare path is a config directory.
23    let cluster_arg = cluster_dir.to_string_lossy();
24    let snapshot = if cluster_arg.contains("://") {
25        omnigraph_cluster::read_serving_snapshot_from_storage(cluster_arg.as_ref()).await
26    } else {
27        omnigraph_cluster::read_serving_snapshot(cluster_dir).await
28    }
29    .map_err(|diagnostics| {
30        let details = diagnostics
31            .iter()
32            .map(|diagnostic| {
33                format!(
34                    "[{}] {}: {}",
35                    diagnostic.code, diagnostic.path, diagnostic.message
36                )
37            })
38            .collect::<Vec<_>>()
39            .join("\n  ");
40        eyre!(
41            "the cluster at '{}' is not ready to serve:\n  {details}",
42            cluster_dir.display()
43        )
44    })?;
45    for diagnostic in &snapshot.diagnostics {
46        warn!(
47            code = %diagnostic.code,
48            path = %diagnostic.path,
49            message = %diagnostic.message,
50            "cluster startup diagnostic"
51        );
52    }
53    let env_require_all_graphs = env_flag("OMNIGRAPH_REQUIRE_ALL_GRAPHS");
54    let require_all_graphs = cli_require_all_graphs || env_require_all_graphs;
55    if require_all_graphs && !snapshot.diagnostics.is_empty() {
56        let details = snapshot
57            .diagnostics
58            .iter()
59            .map(|diagnostic| {
60                format!(
61                    "[{}] {}: {}",
62                    diagnostic.code, diagnostic.path, diagnostic.message
63                )
64            })
65            .collect::<Vec<_>>()
66            .join("\n  ");
67        bail!(
68            "strict cluster boot requires every applied graph to be ready; startup diagnostics:\n  {details}"
69        );
70    }
71
72    // Bindings -> Cedar slots. The serving pipeline loads one bundle per
73    // graph plus one server-level bundle; stacked bundles per scope are a
74    // later slice — refuse loudly rather than silently merging policy.
75    let mut server_policy: Option<PolicySource> = None;
76    let mut graph_policies: BTreeMap<String, PolicySource> = BTreeMap::new();
77    for policy in &snapshot.policies {
78        for binding in &policy.applies_to {
79            if binding == "cluster" {
80                if server_policy
81                    .replace(PolicySource::Inline(policy.source.clone()))
82                    .is_some()
83                {
84                    bail!(
85                        "multiple policy bundles bind the cluster scope; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
86                    );
87                }
88            } else if let Some(graph_id) = binding.strip_prefix("graph.") {
89                if graph_policies
90                    .insert(
91                        graph_id.to_string(),
92                        PolicySource::Inline(policy.source.clone()),
93                    )
94                    .is_some()
95                {
96                    bail!(
97                        "multiple policy bundles bind graph '{graph_id}'; cluster-mode serving supports one bundle per scope — split or merge bundles (multi-bundle scopes are a later slice)"
98                    );
99                }
100            } else {
101                bail!("unrecognized policy binding '{binding}' in the applied revision");
102            }
103        }
104    }
105
106    let mut graphs = Vec::new();
107    let mut skipped_graphs = Vec::new();
108    for graph in &snapshot.graphs {
109        let specs: Vec<queries::RegistrySpec> = snapshot
110            .queries
111            .iter()
112            .filter(|query| query.graph_id == graph.graph_id)
113            .map(|query| queries::RegistrySpec {
114                name: query.name.clone(),
115                source: query.source.clone(),
116                // The §D5 bridge: the cluster registry has no expose flag
117                // (exposure becomes a policy decision in Phase 6) — cluster
118                // mode lists every stored query.
119                expose: true,
120                tool_name: None,
121            })
122            .collect();
123        let registry = match QueryRegistry::from_specs(specs) {
124            Ok(registry) => registry,
125            Err(errors) => {
126                let details = errors
127                    .iter()
128                    .map(|error| error.to_string())
129                    .collect::<Vec<_>>()
130                    .join("\n  ");
131                warn!(
132                    graph_id = %graph.graph_id,
133                    errors = %details,
134                    "graph quarantined because stored queries failed to parse"
135                );
136                skipped_graphs.push(format!(
137                    "{}: stored queries failed to parse: {details}",
138                    graph.graph_id
139                ));
140                continue;
141            }
142        };
143        let embedding = match graph
144            .embedding
145            .as_ref()
146            .map(|profile| {
147                profile.resolve().map_err(|err| {
148                    eyre!("embedding provider for graph '{}': {err}", graph.graph_id)
149                })
150            })
151            .transpose()
152        {
153            Ok(embedding) => embedding,
154            Err(err) => {
155                warn!(
156                    graph_id = %graph.graph_id,
157                    error = %err,
158                    "graph quarantined because embedding provider configuration failed"
159                );
160                skipped_graphs.push(format!("{}: {err}", graph.graph_id));
161                continue;
162            }
163        };
164        graphs.push(GraphStartupConfig {
165            graph_id: graph.graph_id.clone(),
166            uri: graph.root.to_string_lossy().to_string(),
167            policy: graph_policies.get(&graph.graph_id).cloned(),
168            embedding,
169            queries: registry,
170        });
171    }
172    if graphs.is_empty() {
173        let skipped = skipped_graphs.join(", ");
174        bail!(
175            "the cluster at '{}' has no healthy graphs to serve{}",
176            cluster_dir.display(),
177            if skipped.is_empty() {
178                String::new()
179            } else {
180                format!(" (quarantined: {skipped})")
181            }
182        );
183    }
184    if require_all_graphs && !skipped_graphs.is_empty() {
185        bail!(
186            "strict cluster boot requires every graph to build startup settings (quarantined: {})",
187            skipped_graphs.join(", ")
188        );
189    }
190
191    let env_unauth = env_flag("OMNIGRAPH_UNAUTHENTICATED");
192
193    Ok(ServerConfig {
194        mode: ServerConfigMode::Multi {
195            graphs,
196            config_path: cluster_dir.clone(),
197            server_policy,
198        },
199        bind: cli_bind.unwrap_or_else(|| "127.0.0.1:8080".to_string()),
200        allow_unauthenticated: cli_allow_unauthenticated || env_unauth,
201        require_all_graphs,
202    })
203}
204
205/// RFC-011 cluster-only boot: the server serves exclusively from a
206/// cluster's applied revision (`--cluster <dir | s3://…>`). The legacy
207/// omnigraph.yaml / `--target` / positional-URI single-graph boot paths
208/// were removed — a deployment serves from exactly one source.
209pub async fn load_server_settings(
210    cli_cluster: Option<&PathBuf>,
211    cli_bind: Option<String>,
212    cli_allow_unauthenticated: bool,
213    cli_require_all_graphs: bool,
214) -> Result<ServerConfig> {
215    let Some(cluster_dir) = cli_cluster else {
216        bail!(
217            "omnigraph-server boots from a cluster: pass --cluster <dir|s3://…> \
218             (the cluster's applied revision is the deployment artifact). The legacy \
219             single-graph boot (positional <URI>, --target, --config omnigraph.yaml) \
220             was removed in RFC-011."
221        );
222    };
223    load_cluster_settings(
224        cluster_dir,
225        cli_bind,
226        cli_allow_unauthenticated,
227        cli_require_all_graphs,
228    )
229    .await
230}
231
232fn env_flag(name: &str) -> bool {
233    std::env::var(name)
234        .ok()
235        .map(|v| {
236            let trimmed = v.trim();
237            !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
238        })
239        .unwrap_or(false)
240}
241
242/// MR-723 server runtime state, classified from the three-state matrix
243/// of (bearer tokens configured) × (policy file configured) at startup.
244///
245/// * **Open** — neither tokens nor policy; requires explicit
246///   `allow_unauthenticated`. Effectively a "trust the network" dev
247///   mode. `serve()` refuses to start in this shape without the flag,
248///   so the only way to reach this state at runtime is via deliberate
249///   operator opt-in.
250/// * **DefaultDeny** — tokens configured but no policy file. The
251///   server requires a valid bearer token; once authenticated, every
252///   action except `Read` is denied with 403. Closes the "tokens but
253///   forgot the policy file" trap.
254/// * **PolicyEnabled** — policy file configured and at least one
255///   bearer token configured. Cedar evaluates every authenticated
256///   request. Policy without tokens is rejected at startup —
257///   such a server would 401 every request, which is bug-shaped
258///   rather than feature-shaped (operators wanting "deny all
259///   unauthenticated traffic" should configure tokens plus a
260///   deny-all policy to get meaningful 403s with policy-decision
261///   logging instead).
262#[derive(Debug, Clone, Copy, Eq, PartialEq)]
263pub enum ServerRuntimeState {
264    Open,
265    DefaultDeny,
266    PolicyEnabled,
267}
268
269/// Compute the [`ServerRuntimeState`] from the configured inputs.
270/// Pulled out as a pure function so the matrix is unit-testable
271/// without standing up the full server.
272///
273/// The classifier is the **single source of truth** for "should we
274/// start?" — both `serve()`'s single-mode and multi-mode branches
275/// call this before constructing their `AppState`. Adding a startup
276/// invariant here means both modes enforce it automatically; the
277/// alternative (per-constructor `bail!`) drifts the moment a third
278/// mode is added.
279pub fn classify_server_runtime_state(
280    has_tokens: bool,
281    has_policy: bool,
282    allow_unauthenticated: bool,
283) -> Result<ServerRuntimeState> {
284    match (has_tokens, has_policy, allow_unauthenticated) {
285        (false, false, false) => bail!(
286            "server has no bearer tokens and no policy file configured. This is a fully \
287             open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
288             if you actually want that, otherwise configure bearer tokens (see \
289             docs/user/operations/server.md) and a graph or cluster policy bundle in \
290             the cluster config, then run `omnigraph cluster apply` and restart."
291        ),
292        (false, false, true) => Ok(ServerRuntimeState::Open),
293        (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
294        (false, true, _) => bail!(
295            "policy file is configured but no bearer tokens — every request would 401 \
296             because no token can ever match. Configure at least one bearer token (see \
297             docs/user/operations/server.md), or remove the policy file. To deny all unauthenticated \
298             traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
299             produces meaningful 403s with policy-decision logging instead of silent 401s."
300        ),
301        (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
302    }
303}
304
305pub(crate) fn normalize_bearer_token(value: Option<String>) -> Option<String> {
306    value
307        .map(|value| value.trim().to_string())
308        .filter(|value| !value.is_empty())
309}
310
311pub(crate) fn normalize_bearer_actor(value: String) -> Result<String> {
312    let value = value.trim().to_string();
313    if value.is_empty() {
314        bail!("bearer token actor names must not be blank");
315    }
316    Ok(value)
317}
318
319pub(crate) fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
320    let entries: HashMap<String, String> = serde_json::from_str(value)
321        .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
322    Ok(entries.into_iter().collect())
323}
324
325pub(crate) fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
326    let contents = fs::read_to_string(path)
327        .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
328    parse_bearer_tokens_json(&contents)
329        .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
330}
331
332pub(crate) fn validate_bearer_tokens(
333    entries: Vec<(String, String)>,
334) -> Result<Vec<(String, String)>> {
335    let mut seen_actors = HashSet::new();
336    let mut seen_tokens = HashSet::new();
337    let mut normalized = Vec::with_capacity(entries.len());
338
339    for (actor, token) in entries {
340        let actor = normalize_bearer_actor(actor)?;
341        let Some(token) = normalize_bearer_token(Some(token)) else {
342            bail!("bearer token for actor '{actor}' must not be blank");
343        };
344        if !seen_actors.insert(actor.clone()) {
345            bail!("duplicate bearer token actor '{actor}'");
346        }
347        if !seen_tokens.insert(token.clone()) {
348            bail!("duplicate bearer token value configured");
349        }
350        normalized.push((actor, token));
351    }
352
353    normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
354    Ok(normalized)
355}
356
357pub(crate) fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
358    let mut entries = Vec::new();
359
360    if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
361    {
362        entries.push(("default".to_string(), token));
363    }
364
365    if let Some(path) =
366        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
367    {
368        entries.extend(read_bearer_tokens_file(&path)?);
369    } else if let Some(json) =
370        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
371    {
372        entries.extend(parse_bearer_tokens_json(&json)?);
373    }
374
375    validate_bearer_tokens(entries)
376}
377
378#[cfg(test)]
379mod tests {
380    use super::{
381        GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
382        classify_server_runtime_state, hash_bearer_token, normalize_bearer_token,
383        parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
384    };
385    use serial_test::serial;
386    use std::env;
387    use std::fs;
388    use tempfile::tempdir;
389
390    /// `authorize` returns the allow/deny **decision** (`Authz`) and reserves
391    /// `Err` for operational failures, so the invoke handler can hide a denial
392    /// as 404 without also masking a 401/500. Pins each outcome.
393    #[test]
394    fn authorize_splits_decision_from_operational_error() {
395        use super::{
396            Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor,
397            authorize,
398        };
399        use std::sync::Arc;
400
401        fn req(action: PolicyAction) -> PolicyRequest {
402            PolicyRequest {
403                action,
404                branch: None,
405                target_branch: None,
406            }
407        }
408        let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
409
410        // --- No policy engine installed (open / default-deny modes) ---
411        // A server-scoped action is denied in every no-policy state.
412        assert!(matches!(
413            authorize(Some(&actor), None, req(PolicyAction::GraphList)).unwrap(),
414            Authz::Denied(_)
415        ));
416        // Authenticated actor + a non-read per-graph action → default-deny.
417        assert!(matches!(
418            authorize(Some(&actor), None, req(PolicyAction::Change)).unwrap(),
419            Authz::Denied(_)
420        ));
421        // `read` is the one per-graph action permitted without a policy.
422        assert!(matches!(
423            authorize(Some(&actor), None, req(PolicyAction::Read)).unwrap(),
424            Authz::Allowed
425        ));
426        // Open mode (no actor, no policy) → allowed.
427        assert!(matches!(
428            authorize(None, None, req(PolicyAction::Read)).unwrap(),
429            Authz::Allowed
430        ));
431
432        // --- Policy engine installed ---
433        let policy: PolicyConfig = serde_yaml::from_str(
434            "version: 1\n\
435             groups:\n  team: [act-alice]\n\
436             rules:\n  - id: team-read\n    allow:\n      actors: { group: team }\n      actions: [read]\n      branch_scope: any\n",
437        )
438        .unwrap();
439        let engine = PolicyCompiler::compile(&policy, "graph").unwrap();
440
441        // A matched allow rule → Allowed.
442        assert!(matches!(
443            authorize(
444                Some(&actor),
445                Some(&engine),
446                PolicyRequest {
447                    action: PolicyAction::Read,
448                    branch: Some("main".to_string()),
449                    target_branch: None
450                },
451            )
452            .unwrap(),
453            Authz::Allowed
454        ));
455        // Known actor, no matching allow rule → Denied, carrying the decision message.
456        match authorize(
457            Some(&actor),
458            Some(&engine),
459            PolicyRequest {
460                action: PolicyAction::Change,
461                branch: Some("main".to_string()),
462                target_branch: None,
463            },
464        )
465        .unwrap()
466        {
467            Authz::Denied(message) => {
468                assert!(!message.is_empty(), "a deny carries its decision message")
469            }
470            Authz::Allowed => panic!("change must be denied: only read is allowed"),
471        }
472        // Policy installed but no actor → operational failure (`Err`), NOT a
473        // decision. This is the split that keeps a 401/500 from being masked
474        // as the denial's response in the invoke handler.
475        assert!(
476            authorize(None, Some(&engine), req(PolicyAction::Read)).is_err(),
477            "a missing actor with a policy installed is an operational error, not a deny"
478        );
479    }
480
481    #[test]
482    fn hash_bearer_token_produces_32_byte_output() {
483        let hash = hash_bearer_token("any-token");
484        assert_eq!(hash.len(), 32);
485    }
486
487    /// The single gate both open paths funnel through: it refuses a
488    /// schema breakage (naming the graph label + query), attaches a clean
489    /// registry, and collapses an empty one to `None`. Pure over its args
490    /// (no engine), so it covers the multi-graph path's logic too — the
491    /// only per-path difference is the `label`, asserted here.
492    #[test]
493    fn validate_and_attach_gates_on_schema_and_collapses_empty() {
494        use crate::queries::{QueryRegistry, RegistrySpec};
495        use omnigraph_compiler::catalog::build_catalog;
496        use omnigraph_compiler::schema::parser::parse_schema;
497
498        let schema = parse_schema("node User {\nname: String\n}\n").unwrap();
499        let catalog = build_catalog(&schema).unwrap();
500        let spec = |name: &str, source: &str| RegistrySpec {
501            name: name.to_string(),
502            source: source.to_string(),
503            expose: false,
504            tool_name: None,
505        };
506
507        // Empty registry → nothing attached, no error.
508        let empty = super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
509        assert!(empty.is_none());
510
511        // A query that type-checks → attached.
512        let ok = QueryRegistry::from_specs(vec![spec(
513            "find_user",
514            "query find_user() { match { $u: User } return { $u.name } }",
515        )])
516        .unwrap();
517        assert!(
518            super::validate_and_attach(ok, &catalog, "g")
519                .unwrap()
520                .is_some()
521        );
522
523        // A query referencing a type the schema lacks → boot refusal that
524        // names both the graph label and the offending query.
525        let broken = QueryRegistry::from_specs(vec![spec(
526            "ghost",
527            "query ghost() { match { $w: Widget } return { $w.name } }",
528        )])
529        .unwrap();
530        let err = super::validate_and_attach(broken, &catalog, "graph-x").unwrap_err();
531        let msg = err.to_string();
532        assert!(msg.contains("graph-x"), "labels the graph: {msg}");
533        assert!(msg.contains("ghost"), "names the query: {msg}");
534        assert!(
535            msg.contains("schema check"),
536            "mentions the schema check: {msg}"
537        );
538    }
539
540    #[test]
541    fn hash_bearer_token_is_deterministic() {
542        assert_eq!(
543            hash_bearer_token("stable-input"),
544            hash_bearer_token("stable-input"),
545        );
546    }
547
548    #[test]
549    fn hash_bearer_token_differs_for_different_inputs() {
550        assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
551    }
552
553    #[test]
554    fn hash_bearer_token_matches_known_sha256_vector() {
555        // SHA-256("abc"). If this ever fails, the hash function was swapped.
556        let hash = hash_bearer_token("abc");
557        let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
558        assert_eq!(
559            hex,
560            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
561        );
562    }
563
564    #[tokio::test]
565    async fn server_settings_require_cluster_boot_source() {
566        // RFC-011 cluster-only: with no --cluster the server refuses to
567        // start and names the cluster-required remedy.
568        let error = super::load_server_settings(None, None, false, false)
569            .await
570            .unwrap_err();
571        assert!(
572            error.to_string().contains("boots from a cluster"),
573            "expected cluster-required error, got: {error}",
574        );
575    }
576
577    #[test]
578    fn classify_open_requires_explicit_unauthenticated_flag() {
579        // State 1: no tokens, no policy, no flag → refuse to start.
580        let error = classify_server_runtime_state(false, false, false).unwrap_err();
581        let msg = error.to_string();
582        assert!(
583            msg.contains("--unauthenticated"),
584            "expected refusal message mentioning --unauthenticated, got: {msg}"
585        );
586
587        // Same matrix cell but with the flag set → Open mode permitted.
588        assert_eq!(
589            classify_server_runtime_state(false, false, true).unwrap(),
590            ServerRuntimeState::Open
591        );
592    }
593
594    #[test]
595    fn classify_tokens_without_policy_is_default_deny() {
596        // State 2: tokens configured, no policy → DefaultDeny regardless
597        // of the flag (the flag opts into the fully-open dev mode; it
598        // doesn't downgrade default-deny back to open).
599        assert_eq!(
600            classify_server_runtime_state(true, false, false).unwrap(),
601            ServerRuntimeState::DefaultDeny
602        );
603        assert_eq!(
604            classify_server_runtime_state(true, false, true).unwrap(),
605            ServerRuntimeState::DefaultDeny
606        );
607    }
608
609    #[tokio::test]
610    #[serial]
611    async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
612        // Bug 2 from the bot-review pass: multi-mode startup was missing
613        // the "policy requires tokens" check that single-mode enforces.
614        // After centralizing the check in `classify_server_runtime_state`,
615        // both modes get the same enforcement. This test guards the
616        // multi-mode propagation path.
617        //
618        // Sibling test below pins single mode. Together they pin that
619        // the classifier is called from both branches of `serve()`.
620        let _guard = EnvGuard::set(&[
621            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
622            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
623            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
624            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
625            ("OMNIGRAPH_UNAUTHENTICATED", None),
626        ]);
627        let temp = tempdir().unwrap();
628        // The classifier reads `has_policy_configured` from the config
629        // shape (does the Option contain a path?), not from file
630        // existence, so we can hand it a path without writing a real
631        // policy file — the bail fires before policy load.
632        let policy_path = temp.path().join("server-policy.yaml");
633        let config = ServerConfig {
634            mode: ServerConfigMode::Multi {
635                graphs: vec![GraphStartupConfig {
636                    graph_id: "alpha".to_string(),
637                    uri: temp
638                        .path()
639                        .join("alpha.omni")
640                        .to_string_lossy()
641                        .into_owned(),
642                    policy: None,
643                    embedding: None,
644                    queries: crate::queries::QueryRegistry::default(),
645                }],
646                config_path: temp.path().join("omnigraph.yaml"),
647                server_policy: Some(crate::PolicySource::File(policy_path)),
648            },
649            bind: "127.0.0.1:0".to_string(),
650            allow_unauthenticated: false,
651            require_all_graphs: false,
652        };
653        let result = serve(config).await;
654        let err = result
655            .expect_err("serve should refuse to start in multi mode with policy but no tokens");
656        let msg = format!("{:?}", err);
657        assert!(
658            msg.contains("policy file is configured but no bearer tokens"),
659            "expected policy-without-tokens rejection in multi mode, got: {msg}",
660        );
661    }
662
663    #[tokio::test]
664    #[serial]
665    async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
666        // MR-723 PR A: pin the integration boundary that the classifier
667        // is actually called by `serve()` before any side-effecting
668        // work (Lance dataset open, TcpListener::bind). The classifier
669        // itself is unit-tested above; this test guards the propagation
670        // path from `classify_server_runtime_state` through serve's
671        // `?` so a future refactor that drops the call returns red.
672        //
673        // Marked `#[serial]` because we have to clear all bearer-token
674        // env vars, and another test in this module setting any of them
675        // concurrently would corrupt the read inside `resolve_token_source`.
676        let _guard = EnvGuard::set(&[
677            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
678            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
679            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
680            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
681            ("OMNIGRAPH_UNAUTHENTICATED", None),
682        ]);
683        let temp = tempdir().unwrap();
684        // Graph path doesn't need to exist — classifier fires before
685        // any engine open.
686        let config = ServerConfig {
687            mode: ServerConfigMode::Multi {
688                graphs: vec![GraphStartupConfig {
689                    graph_id: "default".to_string(),
690                    uri: temp
691                        .path()
692                        .join("graph.omni")
693                        .to_string_lossy()
694                        .into_owned(),
695                    policy: None,
696                    embedding: None,
697                    queries: crate::queries::QueryRegistry::default(),
698                }],
699                config_path: temp.path().join("cluster"),
700                server_policy: None,
701            },
702            bind: "127.0.0.1:0".to_string(),
703            allow_unauthenticated: false,
704            require_all_graphs: false,
705        };
706        let result = serve(config).await;
707        let err =
708            result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
709        let msg = format!("{:?}", err);
710        assert!(
711            msg.contains("no bearer tokens") || msg.contains("policy file"),
712            "expected refusal message naming the misconfiguration, got: {msg}",
713        );
714    }
715
716    #[test]
717    fn classify_policy_enabled_requires_tokens() {
718        // State 3: tokens + policy → PolicyEnabled, regardless of the
719        // `allow_unauthenticated` flag (Cedar evaluates the bearer,
720        // the flag is moot once tokens exist).
721        assert_eq!(
722            classify_server_runtime_state(true, true, false).unwrap(),
723            ServerRuntimeState::PolicyEnabled
724        );
725        assert_eq!(
726            classify_server_runtime_state(true, true, true).unwrap(),
727            ServerRuntimeState::PolicyEnabled
728        );
729    }
730
731    #[test]
732    fn classify_policy_without_tokens_is_rejected() {
733        // Closes the "policy installed but no tokens → silent 401 on
734        // every request" footgun. The same shape that single-mode
735        // `open_with_bearer_tokens_and_policy` used to bail on
736        // privately is now rejected by the classifier so both single
737        // and multi mode get the same enforcement from one source of
738        // truth.
739        for allow_unauthenticated in [false, true] {
740            let err =
741                classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
742            let msg = err.to_string();
743            assert!(
744                msg.contains("policy file is configured but no bearer tokens"),
745                "expected policy-without-tokens rejection message; got: {msg}"
746            );
747            assert!(
748                msg.contains("every request would 401"),
749                "rejection message must name the failure mode; got: {msg}"
750            );
751        }
752    }
753
754    #[test]
755    fn normalize_bearer_token_trims_and_filters_blank_values() {
756        assert_eq!(normalize_bearer_token(None), None);
757        assert_eq!(normalize_bearer_token(Some("   ".to_string())), None);
758        assert_eq!(
759            normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
760            Some("demo-token")
761        );
762    }
763
764    struct EnvGuard {
765        saved: Vec<(&'static str, Option<String>)>,
766    }
767
768    impl EnvGuard {
769        fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
770            let saved = vars
771                .iter()
772                .map(|(name, _)| (*name, env::var(name).ok()))
773                .collect::<Vec<_>>();
774            for (name, value) in vars {
775                unsafe {
776                    match value {
777                        Some(value) => env::set_var(name, value),
778                        None => env::remove_var(name),
779                    }
780                }
781            }
782            Self { saved }
783        }
784    }
785
786    impl Drop for EnvGuard {
787        fn drop(&mut self) {
788            for (name, value) in self.saved.drain(..) {
789                unsafe {
790                    match value {
791                        Some(value) => env::set_var(name, value),
792                        None => env::remove_var(name),
793                    }
794                }
795            }
796        }
797    }
798
799    #[test]
800    fn parse_bearer_tokens_json_reads_actor_token_map() {
801        let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
802        assert_eq!(tokens.len(), 2);
803        assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
804        assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
805    }
806
807    #[test]
808    #[serial]
809    fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
810        let temp = tempdir().unwrap();
811        let tokens_path = temp.path().join("tokens.json");
812        fs::write(
813            &tokens_path,
814            r#"{"team-01":"token-one","team-02":"token-two"}"#,
815        )
816        .unwrap();
817
818        let _guard = EnvGuard::set(&[
819            ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
820            (
821                "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
822                Some(tokens_path.to_str().unwrap()),
823            ),
824            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
825        ]);
826
827        let tokens = server_bearer_tokens_from_env().unwrap();
828        assert_eq!(
829            tokens,
830            vec![
831                ("default".to_string(), "legacy-token".to_string()),
832                ("team-01".to_string(), "token-one".to_string()),
833                ("team-02".to_string(), "token-two".to_string()),
834            ]
835        );
836    }
837}