Skip to main content

omnigraph_server/
lib.rs

1pub mod api;
2pub mod auth;
3pub mod config;
4pub mod graph_id;
5pub mod identity;
6pub mod policy;
7pub mod queries;
8pub mod registry;
9pub mod workload;
10
11pub use graph_id::GraphId;
12pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
13pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
14
15use crate::queries::{QueryRegistry, check, format_check_breakages};
16
17use std::collections::{HashMap, HashSet};
18use std::fs;
19use std::io;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use api::{
25    BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
26    BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
27    CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
28    HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
29    InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
30    SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
31    schema_apply_output, snapshot_payload,
32};
33pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
34use axum::body::{Body, Bytes};
35use axum::extract::DefaultBodyLimit;
36use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
37use axum::http::StatusCode;
38use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
39use axum::middleware::{self, Next};
40use axum::response::{IntoResponse, Response};
41use axum::routing::{delete, get, post};
42use axum::{Json, Router};
43use color_eyre::eyre::{Result, WrapErr, bail};
44pub use config::{
45    AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
46    ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
47    graph_resource_id_for_selection, load_config,
48};
49use futures::stream;
50use omnigraph::db::{Omnigraph, ReadTarget};
51use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
52use omnigraph::storage::normalize_root_uri;
53use omnigraph_compiler::catalog::Catalog;
54use omnigraph_compiler::json_params_to_param_map;
55use omnigraph_compiler::query::parser::parse_query;
56use omnigraph_compiler::{JsonParamMode, ParamMap};
57pub use policy::{
58    PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
59    PolicyRequest, PolicyResourceKind, PolicyTestConfig,
60};
61use serde::Deserialize;
62use serde_json::Value;
63use sha2::{Digest, Sha256};
64use subtle::ConstantTimeEq;
65use tokio::net::TcpListener;
66use tokio::sync::mpsc;
67use tower_http::trace::TraceLayer;
68use tracing::{error, info, warn};
69use tracing_subscriber::EnvFilter;
70use utoipa::OpenApi;
71use utoipa::openapi::path::{Parameter, ParameterIn};
72use utoipa::openapi::schema::{Object, Type};
73use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
74
75type BearerTokenHash = [u8; 32];
76
77fn hash_bearer_token(token: &str) -> BearerTokenHash {
78    let digest = Sha256::digest(token.as_bytes());
79    let mut out = [0u8; 32];
80    out.copy_from_slice(&digest);
81    out
82}
83
84#[derive(OpenApi)]
85#[openapi(
86    info(
87        title = "Omnigraph API",
88        description = "HTTP API for the Omnigraph graph database",
89    ),
90    paths(
91        server_health,
92        server_graphs_list,
93        server_snapshot,
94        // deprecated; the #[deprecated] attribute on the handler
95        // surfaces as `deprecated: true` on the OpenAPI operation.
96        #[allow(deprecated)] server_read,
97        server_query,
98        server_export,
99        #[allow(deprecated)] server_change,
100        server_mutate,
101        server_list_queries,
102        server_invoke_query,
103        server_schema_apply,
104        server_schema_get,
105        server_ingest,
106        server_branch_list,
107        server_branch_create,
108        server_branch_delete,
109        server_branch_merge,
110        server_commit_list,
111        server_commit_show,
112    ),
113    modifiers(&SecurityAddon),
114)]
115pub struct ApiDoc;
116
117struct SecurityAddon;
118
119impl utoipa::Modify for SecurityAddon {
120    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
121        openapi
122            .components
123            .get_or_insert_with(Default::default)
124            .add_security_scheme(
125                "bearer_token",
126                SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
127            );
128    }
129}
130
131const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
132const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
133const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
134const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
135
136#[derive(Debug, Clone)]
137pub struct ServerConfig {
138    /// Server topology + the graphs to open at startup. Single-mode
139    /// invocations (`omnigraph-server <URI>` or `--target <name>`)
140    /// produce `ServerConfigMode::Single`; multi-mode invocations
141    /// (`--config omnigraph.yaml` with a non-empty `graphs:` map and
142    /// no single-mode selector) produce `ServerConfigMode::Multi`.
143    pub mode: ServerConfigMode,
144    pub bind: String,
145    /// Operator opt-in for fully-unauthenticated dev mode (MR-723).
146    /// When neither bearer tokens nor a policy file are configured,
147    /// `serve()` refuses to start unless this is true (set via
148    /// `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1`). The
149    /// motivation is that "no tokens + no policy" looks like protection
150    /// (no Cedar errors at boot) but is actually fully open — operators
151    /// who set up auth and forgot the policy file would otherwise ship
152    /// the illusion of protection.
153    pub allow_unauthenticated: bool,
154}
155
156/// What `load_server_settings` produces after applying the four-rule
157/// mode inference matrix (MR-668 decision 2).
158#[derive(Debug, Clone)]
159pub enum ServerConfigMode {
160    /// Legacy invocation — one graph at the given URI. Either:
161    ///   * `omnigraph-server <URI>` (CLI positional), or
162    ///   * `omnigraph-server --target <name> --config omnigraph.yaml`, or
163    ///   * `omnigraph-server --config omnigraph.yaml` with `server.graph`
164    ///     set to a named target.
165    Single {
166        uri: String,
167        /// Cedar graph resource id for the single graph. A named selection
168        /// uses the graph name; an anonymous URI uses the normalized URI to
169        /// preserve legacy single-graph policy identity.
170        graph_id: String,
171        /// Top-level `policy.file` (single-graph Cedar policy).
172        policy_file: Option<PathBuf>,
173        /// Top-level stored-query registry, loaded and identity-checked
174        /// at settings-build time; type-checked against the schema when
175        /// the engine opens.
176        queries: QueryRegistry,
177    },
178    /// Multi-graph invocation — `--config omnigraph.yaml` with a
179    /// non-empty `graphs:` map and no single-mode selector.
180    Multi {
181        /// Per-graph startup configs, sorted by graph id (BTreeMap
182        /// iteration order). The parallel-open loop iterates this.
183        graphs: Vec<GraphStartupConfig>,
184        /// Path to the config file the server was started from. Kept on
185        /// the mode so future runtime mutation (deferred — see release
186        /// notes) can locate the source of truth without re-parsing CLI
187        /// args.
188        config_path: PathBuf,
189        /// `server.policy.file` (server-level Cedar policy for the
190        /// management endpoints). Wired into `GET /graphs` authorization.
191        server_policy_file: Option<PathBuf>,
192    },
193}
194
195/// One graph's startup-time configuration: id, opened URI, optional
196/// per-graph policy file path. Constructed by `load_server_settings`
197/// in multi mode; consumed by `serve`'s parallel open loop.
198#[derive(Debug, Clone)]
199pub struct GraphStartupConfig {
200    pub graph_id: String,
201    pub uri: String,
202    pub policy_file: Option<PathBuf>,
203    /// Per-graph stored-query registry, loaded and identity-checked at
204    /// settings-build time; type-checked against the schema when this
205    /// graph's engine opens.
206    pub queries: QueryRegistry,
207}
208
209/// Runtime routing for the server. Single mode = legacy
210/// `omnigraph-server <URI>` invocation, one graph, flat HTTP routes.
211/// Multi mode = `--config omnigraph.yaml` with a non-empty `graphs:`
212/// map, N graphs, cluster routes (`/graphs/{graph_id}/...`). Mode is
213/// determined at startup by `load_server_settings`.
214///
215/// In single mode the handle lives here directly — there is no
216/// registry, no sentinel key, no walk-and-assert. In multi mode the
217/// registry carries N handles and the middleware dispatches on the
218/// URL's `{graph_id}` segment.
219///
220/// Both modes share the same handler bodies — the routing middleware
221/// (`resolve_graph_handle`) injects `Arc<GraphHandle>` as a request
222/// extension so handlers never see the routing discriminator.
223#[derive(Clone)]
224pub enum GraphRouting {
225    /// Single-graph deployment: one handle, flat routes (`/snapshot`,
226    /// `/read`, …). The `handle.uri` field carries the URI the engine
227    /// was opened from. Backward compatible with v0.6.0 deployments.
228    Single { handle: Arc<GraphHandle> },
229    /// Multi-graph deployment: many handles, cluster routes
230    /// (`/graphs/{graph_id}/...`). `config_path` is the `omnigraph.yaml`
231    /// the server reads at startup; preserved here so future runtime
232    /// mutation (deferred) can find the source of truth without
233    /// re-parsing CLI args. The server treats the file as
234    /// operator-owned and never writes it.
235    Multi {
236        registry: Arc<GraphRegistry>,
237        config_path: Option<PathBuf>,
238    },
239}
240
241#[derive(Clone)]
242pub struct AppState {
243    /// Runtime routing — the single source of truth for where each
244    /// request's graph lives. Single mode holds the handle directly;
245    /// multi mode holds the registry + config path. Both arms are
246    /// the same shape from a handler's perspective: middleware
247    /// extracts an `Arc<GraphHandle>` and injects it as a request
248    /// extension.
249    routing: GraphRouting,
250    /// Per-actor admission control. Process-wide (not per-graph) —
251    /// see MR-668 decision Q6.
252    workload: Arc<workload::WorkloadController>,
253    bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
254    /// Server-level Cedar policy. Used by management endpoints (`POST
255    /// /graphs`, `GET /graphs`) which act on the registry resource,
256    /// not on a per-graph resource. Loaded from `server.policy.file`
257    /// in `omnigraph.yaml`. `None` outside multi mode and when no
258    /// server policy is configured. Per-graph policies live on each
259    /// `GraphHandle.policy`.
260    server_policy: Option<Arc<PolicyEngine>>,
261}
262
263struct ExportStreamWriter {
264    sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
265}
266
267impl Write for ExportStreamWriter {
268    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
269        self.sender
270            .send(Ok(Bytes::copy_from_slice(buf)))
271            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
272        Ok(buf.len())
273    }
274
275    fn flush(&mut self) -> io::Result<()> {
276        Ok(())
277    }
278}
279
280#[derive(Debug)]
281pub struct ApiError {
282    status: StatusCode,
283    code: ErrorCode,
284    message: String,
285    merge_conflicts: Vec<api::MergeConflictOutput>,
286    manifest_conflict: Option<api::ManifestConflictOutput>,
287}
288
289impl AppState {
290    /// Canonical single-mode constructor. Every other `new_*` / `open_*`
291    /// helper is a thin convenience wrapper around this one. Builds the
292    /// engine + per-graph policy through `build_single_mode`, which
293    /// applies `Omnigraph::with_policy` so HTTP-layer and engine-layer
294    /// policy can never diverge — there is no "policy installed on HTTP
295    /// but not on engine" representable state (closes the prior
296    /// `with_policy_engine` footgun that reused the engine `Arc`
297    /// without re-applying `with_policy`).
298    pub fn new_single(
299        uri: String,
300        db: Omnigraph,
301        bearer_tokens: Vec<(String, String)>,
302        policy_engine: Option<PolicyEngine>,
303        workload: workload::WorkloadController,
304    ) -> Self {
305        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
306        let per_graph_policy = policy_engine.map(Arc::new);
307        Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
308    }
309
310    /// Like `new_single`, but attaches a pre-validated stored-query
311    /// registry. Private — the production single-mode boot path
312    /// (`open_single_with_queries`) is the only caller; every public
313    /// `new_*` constructor builds with no stored queries.
314    fn new_single_with_queries(
315        uri: String,
316        db: Omnigraph,
317        bearer_tokens: Vec<(String, String)>,
318        policy_engine: Option<PolicyEngine>,
319        workload: workload::WorkloadController,
320        queries: Option<Arc<QueryRegistry>>,
321    ) -> Self {
322        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
323        let per_graph_policy = policy_engine.map(Arc::new);
324        Self::build_single_mode(
325            uri,
326            db,
327            bearer_tokens,
328            per_graph_policy,
329            Arc::new(workload),
330            queries,
331        )
332    }
333
334    pub fn new(uri: String, db: Omnigraph) -> Self {
335        Self::new_single(
336            uri,
337            db,
338            Vec::new(),
339            None,
340            workload::WorkloadController::from_env(),
341        )
342    }
343
344    pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
345        let bearer_tokens = normalize_bearer_token(bearer_token)
346            .into_iter()
347            .map(|token| ("default".to_string(), token))
348            .collect();
349        Self::new_with_bearer_tokens(uri, db, bearer_tokens)
350    }
351
352    pub fn new_with_bearer_tokens(
353        uri: String,
354        db: Omnigraph,
355        bearer_tokens: Vec<(String, String)>,
356    ) -> Self {
357        Self::new_single(
358            uri,
359            db,
360            bearer_tokens,
361            None,
362            workload::WorkloadController::from_env(),
363        )
364    }
365
366    pub fn new_with_bearer_tokens_and_policy(
367        uri: String,
368        db: Omnigraph,
369        bearer_tokens: Vec<(String, String)>,
370        policy_engine: Option<PolicyEngine>,
371    ) -> Self {
372        Self::new_single(
373            uri,
374            db,
375            bearer_tokens,
376            policy_engine,
377            workload::WorkloadController::from_env(),
378        )
379    }
380
381    /// Construct with a caller-provided [`workload::WorkloadController`].
382    /// Tests and benches use this to override per-actor caps without
383    /// mutating global env vars (unsafe in Rust 2024 once the async
384    /// runtime is up — `setenv` isn't thread-safe). For tests that also
385    /// need a custom `PolicyEngine`, use [`new_single`] directly.
386    pub fn new_with_workload(
387        uri: String,
388        db: Omnigraph,
389        bearer_tokens: Vec<(String, String)>,
390        workload: workload::WorkloadController,
391    ) -> Self {
392        Self::new_single(uri, db, bearer_tokens, None, workload)
393    }
394
395    pub async fn open(uri: impl Into<String>) -> Result<Self> {
396        Self::open_with_bearer_token(uri, None).await
397    }
398
399    pub async fn open_with_bearer_token(
400        uri: impl Into<String>,
401        bearer_token: Option<String>,
402    ) -> Result<Self> {
403        let bearer_tokens = normalize_bearer_token(bearer_token)
404            .into_iter()
405            .map(|token| ("default".to_string(), token))
406            .collect();
407        Self::open_with_bearer_tokens(uri, bearer_tokens).await
408    }
409
410    pub async fn open_with_bearer_tokens(
411        uri: impl Into<String>,
412        bearer_tokens: Vec<(String, String)>,
413    ) -> Result<Self> {
414        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
415        let db = Omnigraph::open(&uri).await?;
416        Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
417    }
418
419    pub async fn open_with_bearer_tokens_and_policy(
420        uri: impl Into<String>,
421        bearer_tokens: Vec<(String, String)>,
422        policy_file: Option<&PathBuf>,
423    ) -> Result<Self> {
424        Self::open_single_with_queries(
425            uri,
426            bearer_tokens,
427            policy_file,
428            QueryRegistry::default(),
429        )
430        .await
431    }
432
433    /// Single-mode boot with a stored-query registry: open the engine,
434    /// **type-check the registry against the live schema and refuse to
435    /// start on a breakage** (same posture as bad policy YAML), log
436    /// non-blocking warnings, then attach the registry to the handle.
437    /// With an empty registry the check is a no-op and no registry is
438    /// attached — that is the path `open_with_bearer_tokens_and_policy`
439    /// (no stored queries) takes.
440    pub async fn open_single_with_queries(
441        uri: impl Into<String>,
442        bearer_tokens: Vec<(String, String)>,
443        policy_file: Option<&PathBuf>,
444        queries: QueryRegistry,
445    ) -> Result<Self> {
446        Self::open_single_with_queries_for_graph_id(uri, bearer_tokens, policy_file, queries, None)
447            .await
448    }
449
450    async fn open_single_with_queries_for_graph_id(
451        uri: impl Into<String>,
452        bearer_tokens: Vec<(String, String)>,
453        policy_file: Option<&PathBuf>,
454        queries: QueryRegistry,
455        graph_id: Option<String>,
456    ) -> Result<Self> {
457        // The "policy requires tokens" invariant is enforced once by
458        // `classify_server_runtime_state` in `serve()`, before either
459        // single-mode or multi-mode construction is reached. By the
460        // time we get here, the (policy, no-tokens) combination has
461        // already been rejected — no second bail needed.
462        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
463        let graph_id = graph_id.unwrap_or_else(|| uri.clone());
464        let db = Omnigraph::open(&uri).await?;
465
466        // Validate the registry against the live schema and resolve it to
467        // an attachable handle (refuse boot on breakage).
468        let registry = validate_and_attach(queries, &db.catalog(), &graph_id)?;
469
470        let policy_engine = match policy_file {
471            Some(path) => Some(PolicyEngine::load_graph(path, &graph_id)?),
472            None => None,
473        };
474        Ok(Self::new_single_with_queries(
475            uri,
476            db,
477            bearer_tokens,
478            policy_engine,
479            workload::WorkloadController::from_env(),
480            registry,
481        ))
482    }
483
484    /// Single-mode shared construction: wraps the bare engine + per-graph
485    /// policy in a `GraphHandle` carried directly by `GraphRouting::Single`.
486    /// Per-graph policy enforcement on the engine (MR-722) is re-applied
487    /// via `Omnigraph::with_policy` so HTTP and engine layers can never
488    /// diverge.
489    fn build_single_mode(
490        uri: String,
491        db: Omnigraph,
492        bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
493        policy_engine: Option<Arc<PolicyEngine>>,
494        workload: Arc<workload::WorkloadController>,
495        queries: Option<Arc<QueryRegistry>>,
496    ) -> Self {
497        // Engine-layer policy gate (MR-722). With a per-graph policy
498        // installed, every `_as` writer on `Omnigraph` calls into the
499        // PolicyChecker. HTTP-layer `authorize_request` is the first
500        // gate; engine-layer is the redundant-but-correct backstop.
501        let db = if let Some(policy) = policy_engine.as_ref() {
502            let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
503            db.with_policy(checker)
504        } else {
505            db
506        };
507        // `GraphHandle.key` is required by the struct, but in single
508        // mode it is never a registry key (there's no registry) and
509        // never compared against user input (routes are flat, no
510        // `{graph_id}` parameter). The label appears only in tracing
511        // output from `resolve_graph_handle`. The literal below is a
512        // log label, not a routing key — when the future cluster
513        // catalog ships, single mode may carry the catalog-assigned
514        // id here instead.
515        let uri = normalize_root_uri(&uri).unwrap_or(uri);
516        let key = GraphKey::cluster(
517            GraphId::try_from("default").expect("'default' is a valid GraphId log label"),
518        );
519        let handle = Arc::new(GraphHandle {
520            key,
521            uri,
522            engine: Arc::new(db),
523            policy: policy_engine,
524            queries,
525        });
526        Self {
527            routing: GraphRouting::Single { handle },
528            workload,
529            bearer_tokens,
530            server_policy: None,
531        }
532    }
533
534    /// Multi-mode constructor — used by the startup loop. Operators
535    /// reach this by invoking `omnigraph-server --config omnigraph.yaml`
536    /// with a non-empty `graphs:` map.
537    ///
538    /// Caller supplies the already-opened `GraphHandle`s and (optionally)
539    /// the path to the source config file. `server_policy` is loaded
540    /// from `server.policy.file` if configured.
541    pub fn new_multi(
542        handles: Vec<Arc<GraphHandle>>,
543        bearer_tokens: Vec<(String, String)>,
544        server_policy: Option<PolicyEngine>,
545        workload: workload::WorkloadController,
546        config_path: Option<PathBuf>,
547    ) -> std::result::Result<Self, InsertError> {
548        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
549        let registry = Arc::new(GraphRegistry::from_handles(handles)?);
550        Ok(Self {
551            routing: GraphRouting::Multi {
552                registry,
553                config_path,
554            },
555            workload: Arc::new(workload),
556            bearer_tokens,
557            server_policy: server_policy.map(Arc::new),
558        })
559    }
560
561    /// Runtime routing accessor. Handlers don't typically inspect this —
562    /// they extract `Arc<GraphHandle>` via the routing middleware — but
563    /// `build_app` matches on it to decide flat vs nested route
564    /// mounting, and a handful of management endpoints (`GET /graphs`,
565    /// the OpenAPI cluster rewrite) match on the discriminant.
566    pub fn routing(&self) -> &GraphRouting {
567        &self.routing
568    }
569
570    fn requires_bearer_auth(&self) -> bool {
571        if !self.bearer_tokens.is_empty() {
572            return true;
573        }
574        if self.server_policy.is_some() {
575            return true;
576        }
577        // Any per-graph policy also requires auth — otherwise the
578        // policy gate would receive unauthenticated requests. Reading
579        // from `routing` is O(1) in both arms: single mode is a direct
580        // `handle.policy.is_some()` check, multi mode reads the
581        // cached `any_per_graph_policy` flag on the registry snapshot.
582        match &self.routing {
583            GraphRouting::Single { handle } => handle.policy.is_some(),
584            GraphRouting::Multi { registry, .. } => registry.snapshot_ref().any_per_graph_policy,
585        }
586    }
587
588    fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
589        // Hash the incoming token and compare against every stored digest in
590        // constant time. Iterate all entries unconditionally so total work —
591        // and therefore response timing — doesn't depend on which slot matches.
592        let provided_hash = hash_bearer_token(provided_token);
593        let mut matched: Option<Arc<str>> = None;
594        for (hash, actor) in self.bearer_tokens.iter() {
595            if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
596                matched = Some(Arc::clone(actor));
597            }
598        }
599        matched.map(ResolvedActor::cluster_static)
600    }
601}
602
603fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
604    let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
605        .into_iter()
606        .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
607        .collect();
608    Arc::from(tokens)
609}
610
611impl ApiError {
612    pub fn unauthorized(message: impl Into<String>) -> Self {
613        Self {
614            status: StatusCode::UNAUTHORIZED,
615            code: ErrorCode::Unauthorized,
616            message: message.into(),
617            merge_conflicts: Vec::new(),
618            manifest_conflict: None,
619        }
620    }
621
622    pub fn forbidden(message: impl Into<String>) -> Self {
623        Self {
624            status: StatusCode::FORBIDDEN,
625            code: ErrorCode::Forbidden,
626            message: message.into(),
627            merge_conflicts: Vec::new(),
628            manifest_conflict: None,
629        }
630    }
631
632    pub fn bad_request(message: impl Into<String>) -> Self {
633        Self {
634            status: StatusCode::BAD_REQUEST,
635            code: ErrorCode::BadRequest,
636            message: message.into(),
637            merge_conflicts: Vec::new(),
638            manifest_conflict: None,
639        }
640    }
641
642    pub fn not_found(message: impl Into<String>) -> Self {
643        Self {
644            status: StatusCode::NOT_FOUND,
645            code: ErrorCode::NotFound,
646            message: message.into(),
647            merge_conflicts: Vec::new(),
648            manifest_conflict: None,
649        }
650    }
651
652    /// HTTP 405 Method Not Allowed. Used when the route is mounted but
653    /// the active server mode doesn't serve it (`GET /graphs` in
654    /// single-graph mode returns this instead of 404 so clients can
655    /// distinguish "wrong context" from "no such resource").
656    pub fn method_not_allowed(message: impl Into<String>) -> Self {
657        Self {
658            status: StatusCode::METHOD_NOT_ALLOWED,
659            code: ErrorCode::MethodNotAllowed,
660            message: message.into(),
661            merge_conflicts: Vec::new(),
662            manifest_conflict: None,
663        }
664    }
665
666    pub fn conflict(message: impl Into<String>) -> Self {
667        Self {
668            status: StatusCode::CONFLICT,
669            code: ErrorCode::Conflict,
670            message: message.into(),
671            merge_conflicts: Vec::new(),
672            manifest_conflict: None,
673        }
674    }
675
676    pub fn internal(message: impl Into<String>) -> Self {
677        Self {
678            status: StatusCode::INTERNAL_SERVER_ERROR,
679            code: ErrorCode::Internal,
680            message: message.into(),
681            merge_conflicts: Vec::new(),
682            manifest_conflict: None,
683        }
684    }
685
686    /// HTTP 429 Too Many Requests — actor exceeded their per-actor
687    /// admission cap (count or byte budget). Clients should respect the
688    /// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded`
689    /// and `RejectReason::ByteBudgetExceeded`.
690    pub fn too_many_requests(message: impl Into<String>) -> Self {
691        Self {
692            status: StatusCode::TOO_MANY_REQUESTS,
693            code: ErrorCode::TooManyRequests,
694            message: message.into(),
695            merge_conflicts: Vec::new(),
696            manifest_conflict: None,
697        }
698    }
699
700    /// Convert a `WorkloadController` rejection into the matching
701    /// `ApiError` variant.
702    pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
703        match reject {
704            workload::RejectReason::InFlightCountExceeded { .. }
705            | workload::RejectReason::ByteBudgetExceeded { .. } => {
706                Self::too_many_requests(reject.to_string())
707            }
708        }
709    }
710
711    fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
712        Self {
713            status: StatusCode::CONFLICT,
714            code: ErrorCode::Conflict,
715            message: summarize_merge_conflicts(&conflicts),
716            merge_conflicts: conflicts,
717            manifest_conflict: None,
718        }
719    }
720
721    fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
722        Self {
723            status: StatusCode::CONFLICT,
724            code: ErrorCode::Conflict,
725            message,
726            merge_conflicts: Vec::new(),
727            manifest_conflict: Some(details),
728        }
729    }
730
731    fn from_omni(err: OmniError) -> Self {
732        match err {
733            OmniError::Compiler(err) => Self::bad_request(err.to_string()),
734            OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
735            OmniError::Manifest(err) => match err.kind {
736                ManifestErrorKind::BadRequest => Self::bad_request(err.message),
737                ManifestErrorKind::NotFound => Self::not_found(err.message),
738                ManifestErrorKind::Conflict => match err.details {
739                    Some(ManifestConflictDetails::ExpectedVersionMismatch {
740                        table_key,
741                        expected,
742                        actual,
743                    }) => Self::manifest_version_conflict(
744                        err.message,
745                        api::ManifestConflictOutput {
746                            table_key,
747                            expected,
748                            actual,
749                        },
750                    ),
751                    _ => Self::conflict(err.message),
752                },
753                ManifestErrorKind::Internal => Self::internal(err.message),
754            },
755            OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
756                conflicts
757                    .iter()
758                    .map(api::MergeConflictOutput::from)
759                    .collect(),
760            ),
761            OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
762            OmniError::Io(err) => Self::internal(format!("io: {err}")),
763            // Engine-layer policy enforcement (MR-722). All denials and
764            // evaluation failures surface here as 403. The HTTP-layer
765            // `authorize_request` already distinguishes 401 (missing
766            // bearer) from 403 (policy denial), so by the time the
767            // engine gate fires, the bearer is valid — any failure from
768            // the engine is a policy outcome, not an auth one.
769            OmniError::Policy(message) => Self::forbidden(message),
770            // `Omnigraph::init` against an existing graph URI in strict
771            // mode. Not currently HTTP-reachable (POST /graphs was
772            // pulled), but mapping is wired so the variant has a
773            // single canonical translation when a future runtime
774            // create endpoint lands.
775            err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
776        }
777    }
778}
779
780fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
781    if conflicts.is_empty() {
782        return "merge conflicts".to_string();
783    }
784
785    let preview: Vec<String> = conflicts
786        .iter()
787        .take(3)
788        .map(|conflict| match conflict.row_id.as_deref() {
789            Some(row_id) => format!(
790                "{}:{} ({})",
791                conflict.table_key,
792                row_id,
793                conflict.kind.as_str()
794            ),
795            None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
796        })
797        .collect();
798
799    let suffix = if conflicts.len() > preview.len() {
800        format!("; and {} more", conflicts.len() - preview.len())
801    } else {
802        String::new()
803    };
804
805    format!("merge conflicts: {}{}", preview.join("; "), suffix)
806}
807
808/// Constant `Retry-After` value (seconds) emitted on 429 responses.
809const RETRY_AFTER_SECONDS: &str = "60";
810
811impl IntoResponse for ApiError {
812    fn into_response(self) -> Response {
813        let mut headers = axum::http::HeaderMap::new();
814        if matches!(self.code, ErrorCode::TooManyRequests) {
815            headers.insert(
816                axum::http::header::RETRY_AFTER,
817                axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
818            );
819        }
820        (
821            self.status,
822            headers,
823            Json(ErrorOutput {
824                error: self.message,
825                code: Some(self.code),
826                merge_conflicts: self.merge_conflicts,
827                manifest_conflict: self.manifest_conflict,
828            }),
829        )
830            .into_response()
831    }
832}
833
834pub fn init_tracing() {
835    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
836    let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
837}
838
839/// Log each non-blocking advisory from a registry check report.
840fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
841    for warning in &report.warnings {
842        warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
843    }
844}
845
846fn validate_registry_against_catalog(
847    registry: &QueryRegistry,
848    catalog: &Catalog,
849    label: &str,
850) -> omnigraph::error::Result<()> {
851    let report = check(registry, catalog);
852    if report.has_breakages() {
853        return Err(OmniError::manifest(format_check_breakages(label, &report)));
854    }
855    log_registry_warnings(label, &report);
856    Ok(())
857}
858
859/// Validate a loaded stored-query registry against the live schema and
860/// resolve it to an attachable handle. Refuses boot on any breakage
861/// (same posture as bad policy YAML), logs the non-blocking warnings,
862/// and collapses an empty registry to `None` (nothing attached). This is
863/// the single gate every open path funnels through, so no opener can
864/// attach a registry that has not been schema-checked. `label` names the
865/// graph in messages.
866fn validate_and_attach(
867    queries: QueryRegistry,
868    catalog: &Catalog,
869    label: &str,
870) -> Result<Option<Arc<QueryRegistry>>> {
871    validate_registry_against_catalog(&queries, catalog, label)
872        .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
873    Ok(if queries.is_empty() {
874        None
875    } else {
876        Some(Arc::new(queries))
877    })
878}
879
880/// Format every load error (parse / identity failure) into a multi-line
881/// boot-abort message.
882fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> String {
883    let joined = errors
884        .iter()
885        .map(|e| e.to_string())
886        .collect::<Vec<_>>()
887        .join("\n  ");
888    format!("graph '{label}': stored-query registry failed to load:\n  {joined}")
889}
890
891pub fn load_server_settings(
892    config_path: Option<&PathBuf>,
893    cli_uri: Option<String>,
894    cli_target: Option<String>,
895    cli_bind: Option<String>,
896    cli_allow_unauthenticated: bool,
897) -> Result<ServerConfig> {
898    let config = load_config(config_path)?;
899    let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
900    // Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
901    // this. Treat any non-empty, non-"0"/"false" string as truthy —
902    // standard 12-factor "any value is true" reading of the env var.
903    let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
904        .ok()
905        .map(|v| {
906            let trimmed = v.trim();
907            !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
908        })
909        .unwrap_or(false);
910    let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
911
912    // MR-668 decision 2 — four-rule mode inference matrix.
913    //
914    //   1. CLI `<URI>` positional        → Single (URI = the value)
915    //   2. CLI `--target <name>`         → Single (URI = graphs.<name>.uri)
916    //   3. `server.graph` in config      → Single (URI = graphs.<server.graph>.uri)
917    //   4. `--config` + non-empty `graphs:` + no single-mode selector
918    //                                    → Multi (every entry in `graphs:`)
919    //   5. otherwise                     → error with migration hint
920    //
921    // Rules 1-3 are mutually compatible (CLI URI wins over `--target`
922    // wins over `server.graph`), reusing the existing
923    // `resolve_target_uri` precedence.
924    let has_cli_uri = cli_uri.is_some();
925    let has_cli_target = cli_target.is_some();
926    let has_server_graph = config.server_graph_name().is_some();
927    let has_graphs_map = !config.graphs.is_empty();
928    let has_explicit_config = config_path.is_some();
929
930    let mode = if has_cli_uri || has_cli_target || has_server_graph {
931        // Rules 1, 2, or 3 → Single mode.
932        let raw_uri = config.resolve_target_uri(
933            cli_uri,
934            cli_target.as_deref(),
935            config.server_graph_name(),
936        )?;
937        let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
938            format!("normalize single-graph URI '{raw_uri}' from server settings")
939        })?;
940        // Config follows graph IDENTITY, not mode: a bare URI is anonymous
941        // (top-level config); a graph chosen by name uses its per-graph
942        // `graphs.<name>.{policy,queries}`. `resolve_target_uri` already
943        // errored on an unknown name, so a `Some(name)` here is a known graph.
944        let selected: Option<&str> = if has_cli_uri {
945            None
946        } else {
947            cli_target.as_deref().or_else(|| config.server_graph_name())
948        };
949        // A named selection must not leave a populated top-level block
950        // silently unused — refuse boot and point at the per-graph block. The
951        // same rule the CLI selection gate enforces, shared via one helper so
952        // the boot check and `omnigraph queries validate`/`list` can't drift.
953        config.ensure_top_level_blocks_honored(selected)?;
954        // Load + identity-check now (no engine needed); the schema
955        // type-check happens when the engine opens.
956        let policy_file = config.resolve_policy_file_for(selected);
957        let queries = QueryRegistry::load(&config, config.query_entries_for(selected))
958            .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(&uri, &errs)))?;
959        let graph_id = graph_resource_id_for_selection(selected, &uri);
960        ServerConfigMode::Single {
961            uri,
962            graph_id,
963            policy_file,
964            queries,
965        }
966    } else if has_explicit_config && has_graphs_map {
967        // Multi mode: every graph uses its per-graph block; top-level
968        // policy/queries are never honored, so a populated one is an error.
969        let unhonored = config.populated_top_level_blocks();
970        if !unhonored.is_empty() {
971            bail!(
972                "multi-graph mode: top-level {} {} not honored — each graph uses its own \
973                 `graphs.<graph_id>.…` block. Move per-graph rules there (and any \
974                 `graph_list` policy to `server.policy.file`).",
975                unhonored.join(" and "),
976                if unhonored.len() == 1 { "is" } else { "are" },
977            );
978        }
979        // Rule 4 → Multi mode. Build a startup config per graph.
980        let mut graphs = Vec::with_capacity(config.graphs.len());
981        for (name, target) in &config.graphs {
982            // Validate the graph id can construct a `GraphId` newtype.
983            // Doing this here (not at registry insert) so a malformed
984            // omnigraph.yaml fails at startup with a clear error.
985            GraphId::try_from(name.clone()).map_err(|err| {
986                color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
987            })?;
988            let raw_uri = config.resolve_uri_value(&target.uri);
989            let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
990                format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
991            })?;
992            // Per-graph `queries:`, selected through the shared
993            // `query_entries_for` so server and CLI resolve identically.
994            // Load + identity-check now; the schema type-check happens
995            // when this graph's engine opens.
996            let queries = QueryRegistry::load(&config, config.query_entries_for(Some(name.as_str())))
997                .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?;
998            graphs.push(GraphStartupConfig {
999                graph_id: name.clone(),
1000                uri,
1001                policy_file: config.resolve_target_policy_file(name),
1002                queries,
1003            });
1004        }
1005        let config_path = config_path
1006            .cloned()
1007            .expect("has_explicit_config implies config_path is Some");
1008        let server_policy_file = config.resolve_server_policy_file();
1009        ServerConfigMode::Multi {
1010            graphs,
1011            config_path,
1012            server_policy_file,
1013        }
1014    } else {
1015        // Rule 5 → error with migration hint.
1016        bail!(
1017            "no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
1018             (`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
1019             omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
1020             file referenced by `--config`."
1021        );
1022    };
1023
1024    Ok(ServerConfig {
1025        mode,
1026        bind,
1027        allow_unauthenticated,
1028    })
1029}
1030
1031/// Whether the loaded config will run the server in multi-graph mode.
1032/// Useful for the test that constructs `ServerConfig` directly.
1033pub fn server_config_is_multi(config: &ServerConfig) -> bool {
1034    matches!(config.mode, ServerConfigMode::Multi { .. })
1035}
1036
1037/// MR-723 server runtime state, classified from the three-state matrix
1038/// of (bearer tokens configured) × (policy file configured) at startup.
1039///
1040/// * **Open** — neither tokens nor policy; requires explicit
1041///   `allow_unauthenticated`. Effectively a "trust the network" dev
1042///   mode. `serve()` refuses to start in this shape without the flag,
1043///   so the only way to reach this state at runtime is via deliberate
1044///   operator opt-in.
1045/// * **DefaultDeny** — tokens configured but no policy file. The
1046///   server requires a valid bearer token; once authenticated, every
1047///   action except `Read` is denied with 403. Closes the "tokens but
1048///   forgot the policy file" trap.
1049/// * **PolicyEnabled** — policy file configured and at least one
1050///   bearer token configured. Cedar evaluates every authenticated
1051///   request. Policy without tokens is rejected at startup —
1052///   such a server would 401 every request, which is bug-shaped
1053///   rather than feature-shaped (operators wanting "deny all
1054///   unauthenticated traffic" should configure tokens plus a
1055///   deny-all policy to get meaningful 403s with policy-decision
1056///   logging instead).
1057#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1058pub enum ServerRuntimeState {
1059    Open,
1060    DefaultDeny,
1061    PolicyEnabled,
1062}
1063
1064/// Compute the [`ServerRuntimeState`] from the configured inputs.
1065/// Pulled out as a pure function so the matrix is unit-testable
1066/// without standing up the full server.
1067///
1068/// The classifier is the **single source of truth** for "should we
1069/// start?" — both `serve()`'s single-mode and multi-mode branches
1070/// call this before constructing their `AppState`. Adding a startup
1071/// invariant here means both modes enforce it automatically; the
1072/// alternative (per-constructor `bail!`) drifts the moment a third
1073/// mode is added.
1074pub fn classify_server_runtime_state(
1075    has_tokens: bool,
1076    has_policy: bool,
1077    allow_unauthenticated: bool,
1078) -> Result<ServerRuntimeState> {
1079    match (has_tokens, has_policy, allow_unauthenticated) {
1080        (false, false, false) => bail!(
1081            "server has no bearer tokens and no policy file configured. This is a fully \
1082             open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
1083             if you actually want that, otherwise configure bearer tokens (see \
1084             docs/user/server.md) and/or `policy.file` in omnigraph.yaml."
1085        ),
1086        (false, false, true) => Ok(ServerRuntimeState::Open),
1087        (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
1088        (false, true, _) => bail!(
1089            "policy file is configured but no bearer tokens — every request would 401 \
1090             because no token can ever match. Configure at least one bearer token (see \
1091             docs/user/server.md), or remove the policy file. To deny all unauthenticated \
1092             traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
1093             produces meaningful 403s with policy-decision logging instead of silent 401s."
1094        ),
1095        (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
1096    }
1097}
1098
1099pub fn build_app(state: AppState) -> Router {
1100    // The per-graph protected routes, identical in single + multi mode.
1101    // Two middleware layers wrap them (outer first, inner last):
1102    //   1. `require_bearer_auth` — extracts the bearer token and injects
1103    //      `ResolvedActor` (or rejects 401).
1104    //   2. `resolve_graph_handle` — injects `Arc<GraphHandle>` based on
1105    //      the active mode (single: the only handle; multi: lookup by
1106    //      `{graph_id}` in the URI path).
1107    let per_graph_protected = Router::new()
1108        .route("/snapshot", get(server_snapshot))
1109        .route("/export", post(server_export))
1110        // /read and /change are kept indefinitely for back-compat;
1111        // their handlers carry #[deprecated] so the OpenAPI operation is
1112        // flagged and their responses include RFC 9745 Deprecation +
1113        // RFC 8288 Link headers. Suppress the call-site warning for the
1114        // route registration itself.
1115        .route("/read", post({
1116            #[allow(deprecated)]
1117            server_read
1118        }))
1119        .route("/query", post(server_query))
1120        .route("/change", post({
1121            #[allow(deprecated)]
1122            server_change
1123        }))
1124        .route("/mutate", post(server_mutate))
1125        .route("/queries", get(server_list_queries))
1126        .route("/queries/{name}", post(server_invoke_query))
1127        .route("/schema", get(server_schema_get))
1128        .route("/schema/apply", post(server_schema_apply))
1129        .route(
1130            "/ingest",
1131            post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
1132        )
1133        .route(
1134            "/branches",
1135            get(server_branch_list).post(server_branch_create),
1136        )
1137        .route("/branches/{branch}", delete(server_branch_delete))
1138        .route("/branches/merge", post(server_branch_merge))
1139        .route("/commits", get(server_commit_list))
1140        .route("/commits/{commit_id}", get(server_commit_show))
1141        .route_layer(middleware::from_fn_with_state(
1142            state.clone(),
1143            resolve_graph_handle,
1144        ))
1145        .route_layer(middleware::from_fn_with_state(
1146            state.clone(),
1147            require_bearer_auth,
1148        ));
1149
1150    // Management endpoints (`GET /graphs`) live alongside the per-graph
1151    // router. They go through bearer auth but NOT through
1152    // `resolve_graph_handle` — they operate on the registry directly.
1153    // The endpoint is mounted in both modes; in single mode the handler
1154    // returns 405 so clients see "resource exists, wrong context"
1155    // rather than 404 "no such resource."
1156    //
1157    // Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not
1158    // exposed in v0.6.0 — operators add graphs by editing
1159    // `omnigraph.yaml` and restarting.
1160    let management = Router::new()
1161        .route("/graphs", get(server_graphs_list))
1162        .route_layer(middleware::from_fn_with_state(
1163            state.clone(),
1164            require_bearer_auth,
1165        ));
1166
1167    // Mount the protected routes differently per mode:
1168    //   * Single → flat routes (legacy: `/snapshot`, `/read`, etc.)
1169    //   * Multi  → nested under `/graphs/{graph_id}/...`
1170    let protected: Router<AppState> = match state.routing() {
1171        GraphRouting::Single { .. } => per_graph_protected.merge(management),
1172        GraphRouting::Multi { .. } => Router::new()
1173            .nest("/graphs/{graph_id}", per_graph_protected)
1174            .merge(management),
1175    };
1176
1177    Router::new()
1178        .route("/healthz", get(server_health))
1179        .route("/openapi.json", get(server_openapi))
1180        .merge(protected)
1181        .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
1182        .layer(TraceLayer::new_for_http())
1183        .with_state(state)
1184}
1185
1186pub async fn serve(config: ServerConfig) -> Result<()> {
1187    let token_source = resolve_token_source().await?;
1188    info!(source = token_source.name(), "loaded bearer token source");
1189    let tokens = token_source.load().await?;
1190
1191    // For runtime-state classification, "any policy configured" means
1192    // either the top-level/single-mode policy file OR a server-level
1193    // policy OR any per-graph policy file. Mirrors the
1194    // `requires_bearer_auth` semantics on AppState.
1195    let has_policy_configured = match &config.mode {
1196        ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
1197        ServerConfigMode::Multi {
1198            graphs,
1199            server_policy_file,
1200            ..
1201        } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
1202    };
1203    let runtime_state = classify_server_runtime_state(
1204        !tokens.is_empty(),
1205        has_policy_configured,
1206        config.allow_unauthenticated,
1207    )?;
1208    match runtime_state {
1209        ServerRuntimeState::Open => warn!(
1210            "running with --unauthenticated: no bearer tokens, no policy file, all \
1211             requests permitted. This is for local dev only — do not expose to a \
1212             network you don't fully trust."
1213        ),
1214        ServerRuntimeState::DefaultDeny => warn!(
1215            "bearer tokens are configured but no policy file is set — running in \
1216             default-deny mode (only `read` actions are permitted for authenticated \
1217             actors). Configure `policy.file` in omnigraph.yaml to enable Cedar rules."
1218        ),
1219        ServerRuntimeState::PolicyEnabled => {}
1220    }
1221
1222    let bind = config.bind.clone();
1223    let state = match config.mode {
1224        ServerConfigMode::Single {
1225            uri,
1226            graph_id,
1227            policy_file,
1228            queries,
1229        } => {
1230            let uri_for_log = uri.clone();
1231            info!(
1232                uri = %uri_for_log,
1233                graph_id = %graph_id,
1234                bind = %bind,
1235                mode = "single",
1236                "serving omnigraph"
1237            );
1238            AppState::open_single_with_queries_for_graph_id(
1239                uri,
1240                tokens,
1241                policy_file.as_ref(),
1242                queries,
1243                Some(graph_id),
1244            )
1245            .await?
1246        }
1247        ServerConfigMode::Multi {
1248            graphs,
1249            config_path,
1250            server_policy_file,
1251        } => {
1252            info!(
1253                bind = %bind,
1254                mode = "multi",
1255                graph_count = graphs.len(),
1256                config = %config_path.display(),
1257                "serving omnigraph"
1258            );
1259            open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
1260        }
1261    };
1262
1263    let listener = TcpListener::bind(&bind).await?;
1264    axum::serve(listener, build_app(state))
1265        .with_graceful_shutdown(shutdown_signal())
1266        .await?;
1267    Ok(())
1268}
1269
1270/// Parallel open of every graph in the startup config, with bounded
1271/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
1272/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
1273/// instances close cleanly via Arc drop).
1274///
1275/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
1276/// trades startup latency for a small amount of concurrent S3 / Lance
1277/// open pressure.
1278async fn open_multi_graph_state(
1279    graphs: Vec<GraphStartupConfig>,
1280    tokens: Vec<(String, String)>,
1281    server_policy_file: Option<&PathBuf>,
1282    config_path: PathBuf,
1283) -> Result<AppState> {
1284    use futures::{StreamExt, TryStreamExt};
1285
1286    if graphs.is_empty() {
1287        bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1288    }
1289
1290    // Server-level policy (loaded once, applies to management endpoints).
1291    // The placeholder graph_id `"server"` is the sentinel the Cedar
1292    // resource-model refactor maps to the singleton
1293    // `Omnigraph::Server::"root"` entity at evaluation time.
1294    let server_policy = match server_policy_file {
1295        Some(path) => Some(PolicyEngine::load_server(path)?),
1296        None => None,
1297    };
1298
1299    // `try_collect` propagates the first error eagerly, dropping every
1300    // in-flight open. `buffer_unordered + collect::<Vec<_>>` would drain
1301    // the stream before checking errors — incorrect for the docstring's
1302    // "fail-fast" claim and wasteful on S3-backed graphs.
1303    let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
1304        .map(|cfg| async move { open_single_graph(cfg).await })
1305        .buffer_unordered(4)
1306        .try_collect()
1307        .await?;
1308
1309    let workload = workload::WorkloadController::from_env();
1310    let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1311        .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1312    Ok(state)
1313}
1314
1315/// Open one graph and wrap it in a `GraphHandle`. Used at startup by
1316/// `open_multi_graph_state`.
1317async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1318    let graph_id = GraphId::try_from(cfg.graph_id.clone())
1319        .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1320    let uri = normalize_root_uri(&cfg.uri)
1321        .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1322
1323    let db = Omnigraph::open(&uri)
1324        .await
1325        .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1326
1327    // Validate this graph's stored queries against the live schema and
1328    // resolve them to an attachable handle (refuse boot on breakage).
1329    // Done before the policy match rebinds `db`; the catalog handle is an
1330    // owned `Arc`, so no borrow of `db` survives into the match.
1331    let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
1332
1333    let (policy_arc, db) = match &cfg.policy_file {
1334        Some(path) => {
1335            let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
1336            let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1337            let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1338            (Some(policy_arc), db.with_policy(checker))
1339        }
1340        None => (None, db),
1341    };
1342
1343    Ok(Arc::new(GraphHandle {
1344        key: GraphKey::cluster(graph_id),
1345        uri,
1346        engine: Arc::new(db),
1347        policy: policy_arc,
1348        queries,
1349    }))
1350}
1351
1352async fn shutdown_signal() {
1353    if let Err(err) = tokio::signal::ctrl_c().await {
1354        error!(error = %err, "failed to install ctrl-c handler");
1355        return;
1356    }
1357    info!("shutdown signal received");
1358}
1359
1360#[utoipa::path(
1361    get,
1362    path = "/healthz",
1363    tag = "health",
1364    operation_id = "health",
1365    responses(
1366        (status = 200, description = "Server is healthy", body = HealthOutput),
1367    ),
1368)]
1369/// Liveness probe.
1370///
1371/// Returns server status and version. Unauthenticated; safe to call from any
1372/// caller. Use this to confirm the server is reachable before invoking other
1373/// endpoints.
1374async fn server_health() -> Json<HealthOutput> {
1375    Json(HealthOutput {
1376        status: "ok".to_string(),
1377        version: SERVER_VERSION.to_string(),
1378        source_version: SERVER_SOURCE_VERSION.map(str::to_string),
1379    })
1380}
1381
1382#[utoipa::path(
1383    get,
1384    path = "/graphs",
1385    tag = "management",
1386    operation_id = "listGraphs",
1387    responses(
1388        (status = 200, description = "List of registered graphs", body = GraphListResponse),
1389        (status = 401, description = "Unauthorized", body = ErrorOutput),
1390        (status = 403, description = "Forbidden", body = ErrorOutput),
1391        (status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput),
1392    ),
1393    security(("bearer_token" = [])),
1394)]
1395/// List every graph currently registered with this server (MR-668).
1396///
1397/// Multi-graph mode only. In single mode, the route returns 405 — there's
1398/// no registry to enumerate. Cedar-gated by the server-level policy via
1399/// the `graph_list` action against `Omnigraph::Server::"root"`.
1400///
1401/// Order: alphabetical by `graph_id` (server-sorted so clients see
1402/// deterministic output across requests).
1403async fn server_graphs_list(
1404    State(state): State<AppState>,
1405    actor: Option<Extension<ResolvedActor>>,
1406) -> std::result::Result<Json<GraphListResponse>, ApiError> {
1407    // 405 in single mode — there's no registry to enumerate, and the
1408    // legacy URL surface didn't expose this endpoint.
1409    let registry = match state.routing() {
1410        GraphRouting::Single { .. } => {
1411            return Err(ApiError::method_not_allowed(
1412                "GET /graphs is only available in multi-graph mode",
1413            ));
1414        }
1415        GraphRouting::Multi { registry, .. } => registry,
1416    };
1417
1418    // Server-level Cedar gate. `state.server_policy` is loaded from
1419    // `server.policy.file` in `omnigraph.yaml` at startup. When no
1420    // server policy is configured, `authorize_request_server` falls
1421    // through to the MR-723 default-deny semantics (every non-Read
1422    // action denied for an authenticated actor). `GraphList` is not
1423    // `Read`, so without a server policy the request gets 403 — which
1424    // is the right default (don't leak the registry until the operator
1425    // explicitly authorizes it).
1426    authorize_request(
1427        actor.as_ref().map(|Extension(actor)| actor),
1428        state.server_policy.as_deref(),
1429        PolicyRequest {
1430            action: PolicyAction::GraphList,
1431            branch: None,
1432            target_branch: None,
1433        },
1434    )?;
1435
1436    let mut graphs: Vec<GraphInfo> = registry
1437        .list()
1438        .into_iter()
1439        .map(|handle| GraphInfo {
1440            graph_id: handle.key.graph_id.as_str().to_string(),
1441            uri: handle.uri.clone(),
1442        })
1443        .collect();
1444    graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
1445    Ok(Json(GraphListResponse { graphs }))
1446}
1447
1448async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
1449    let mut doc = ApiDoc::openapi();
1450    if !state.requires_bearer_auth() {
1451        strip_security(&mut doc);
1452    }
1453    // MR-668: in multi mode, the protected routes live under
1454    // `/graphs/{graph_id}/...`. Rewrite the doc so the spec matches
1455    // the routes the router actually serves. Public paths (`/healthz`)
1456    // stay flat in both modes.
1457    if matches!(state.routing(), GraphRouting::Multi { .. }) {
1458        nest_paths_under_cluster_prefix(&mut doc);
1459    }
1460    Json(doc)
1461}
1462
1463/// Path prefix used to namespace per-graph routes in multi mode.
1464/// Kept in sync with the `Router::nest(...)` invocation in `build_app`.
1465const CLUSTER_PATH_PREFIX: &str = "/graphs/{graph_id}";
1466
1467/// Operation-id prefix applied to every cloned cluster operation.
1468/// Decision 7 in the implementation plan — keeps operation IDs unique
1469/// across the spec when both flat and nested variants ever appear in
1470/// the same generation pass.
1471const CLUSTER_OPERATION_ID_PREFIX: &str = "cluster_";
1472
1473/// Paths that stay flat in every server mode (public or server-level,
1474/// no per-graph dependency). Update this list when adding new
1475/// always-flat endpoints. `/graphs` is the management enumeration —
1476/// it lives at the root in both single mode (405) and multi mode, and
1477/// must never be rewritten to `/graphs/{graph_id}/graphs`.
1478const ALWAYS_FLAT_PATHS: &[&str] = &["/healthz", "/graphs"];
1479
1480/// In multi-mode `server_openapi`, every protected path-item is
1481/// reattached under the cluster prefix. Operation IDs gain the
1482/// `cluster_` prefix so SDK generators don't collide if/when both
1483/// surfaces are merged. Every rewritten operation also declares the
1484/// required `{graph_id}` path parameter so the served OpenAPI document
1485/// remains internally valid.
1486///
1487/// Removing the flat protected paths matches the runtime router —
1488/// in multi mode, requests to `/snapshot` etc. return 404, so the
1489/// spec must agree.
1490fn nest_paths_under_cluster_prefix(doc: &mut utoipa::openapi::OpenApi) {
1491    let original = std::mem::take(&mut doc.paths.paths);
1492    let mut rewritten = std::collections::BTreeMap::new();
1493    for (path, mut item) in original {
1494        if ALWAYS_FLAT_PATHS.contains(&path.as_str()) {
1495            rewritten.insert(path, item);
1496            continue;
1497        }
1498        rename_operation_ids(&mut item, CLUSTER_OPERATION_ID_PREFIX);
1499        add_cluster_graph_id_parameter(&mut item);
1500        let new_path = format!("{CLUSTER_PATH_PREFIX}{path}");
1501        rewritten.insert(new_path, item);
1502    }
1503    doc.paths.paths = rewritten;
1504}
1505
1506fn add_cluster_graph_id_parameter(item: &mut utoipa::openapi::PathItem) {
1507    for op in path_item_operations_mut(item) {
1508        let parameters = op.parameters.get_or_insert_with(Vec::new);
1509        let has_graph_id = parameters
1510            .iter()
1511            .any(|param| param.name == "graph_id" && param.parameter_in == ParameterIn::Path);
1512        if !has_graph_id {
1513            parameters.insert(0, graph_id_path_parameter());
1514        }
1515    }
1516}
1517
1518fn graph_id_path_parameter() -> Parameter {
1519    let mut parameter = Parameter::new("graph_id");
1520    parameter.parameter_in = ParameterIn::Path;
1521    parameter.description = Some("Graph id to route the request to.".to_string());
1522    parameter.schema = Some(Object::with_type(Type::String).into());
1523    parameter
1524}
1525
1526/// Prefix every operation_id in this PathItem with `prefix`.
1527fn rename_operation_ids(item: &mut utoipa::openapi::PathItem, prefix: &str) {
1528    for op in path_item_operations_mut(item) {
1529        if let Some(id) = op.operation_id.as_deref() {
1530            op.operation_id = Some(format!("{prefix}{id}"));
1531        }
1532    }
1533}
1534
1535fn path_item_operations_mut(
1536    item: &mut utoipa::openapi::PathItem,
1537) -> impl Iterator<Item = &mut utoipa::openapi::path::Operation> {
1538    [
1539        item.get.as_mut(),
1540        item.post.as_mut(),
1541        item.put.as_mut(),
1542        item.delete.as_mut(),
1543        item.options.as_mut(),
1544        item.head.as_mut(),
1545        item.patch.as_mut(),
1546        item.trace.as_mut(),
1547    ]
1548    .into_iter()
1549    .flatten()
1550}
1551
1552fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
1553    if let Some(components) = doc.components.as_mut() {
1554        components.security_schemes.clear();
1555    }
1556    for path_item in doc.paths.paths.values_mut() {
1557        for op in [
1558            path_item.get.as_mut(),
1559            path_item.post.as_mut(),
1560            path_item.put.as_mut(),
1561            path_item.delete.as_mut(),
1562            path_item.options.as_mut(),
1563            path_item.head.as_mut(),
1564            path_item.patch.as_mut(),
1565            path_item.trace.as_mut(),
1566        ]
1567        .into_iter()
1568        .flatten()
1569        {
1570            op.security = None;
1571        }
1572    }
1573}
1574
1575async fn require_bearer_auth(
1576    State(state): State<AppState>,
1577    mut request: Request,
1578    next: Next,
1579) -> std::result::Result<Response, ApiError> {
1580    if !state.requires_bearer_auth() {
1581        return Ok(next.run(request).await);
1582    }
1583
1584    let Some(header) = request
1585        .headers()
1586        .get(AUTHORIZATION)
1587        .and_then(|value| value.to_str().ok())
1588    else {
1589        return Err(ApiError::unauthorized("missing bearer token"));
1590    };
1591
1592    let Some(provided_token) = header.strip_prefix("Bearer ") else {
1593        return Err(ApiError::unauthorized("missing bearer token"));
1594    };
1595
1596    let Some(actor) = state.authenticate_bearer_token(provided_token) else {
1597        return Err(ApiError::unauthorized("invalid bearer token"));
1598    };
1599    request.extensions_mut().insert(actor);
1600
1601    Ok(next.run(request).await)
1602}
1603
1604/// Routing middleware (MR-668). Resolves the active graph for the
1605/// request and injects `Arc<GraphHandle>` as an extension so handlers can
1606/// extract it via `Extension<Arc<GraphHandle>>`.
1607///
1608/// **Single mode**: the routing field holds the single handle directly.
1609/// Routes are flat; every request resolves to that handle, regardless
1610/// of the URI path. No registry walk, no sentinel key, no
1611/// programmer-error guard.
1612///
1613/// **Multi mode**: routes are nested under `/graphs/{graph_id}/...`. The
1614/// middleware extracts `{graph_id}` from the URI path and looks it up in
1615/// the registry. Returns 404 if the graph is not registered.
1616///
1617/// The middleware fires AFTER `require_bearer_auth`, so the actor is
1618/// already in the request extensions (or auth was off entirely).
1619async fn resolve_graph_handle(
1620    State(state): State<AppState>,
1621    mut request: Request,
1622    next: Next,
1623) -> std::result::Result<Response, ApiError> {
1624    let handle = match &state.routing {
1625        GraphRouting::Single { handle } => Arc::clone(handle),
1626        GraphRouting::Multi { registry, .. } => {
1627            // `Router::nest("/graphs/{graph_id}", inner)` rewrites
1628            // `request.uri().path()` to the inner suffix (e.g. `/snapshot`).
1629            // The pre-rewrite URI is preserved in the `OriginalUri`
1630            // request extension by axum's router; we read from there to
1631            // extract `{graph_id}`. Fall back to the current URI only if
1632            // the extension is missing, which shouldn't happen for
1633            // nested routes but is safe defensive code.
1634            let original_path: String = request
1635                .extensions()
1636                .get::<OriginalUri>()
1637                .map(|OriginalUri(uri)| uri.path().to_string())
1638                .unwrap_or_else(|| request.uri().path().to_string());
1639            let graph_id_str = original_path
1640                .strip_prefix("/graphs/")
1641                .and_then(|rest| rest.split('/').next())
1642                .filter(|s| !s.is_empty())
1643                .ok_or_else(|| {
1644                    ApiError::bad_request(
1645                        "cluster route missing /graphs/{graph_id} prefix".to_string(),
1646                    )
1647                })?;
1648            let graph_id = GraphId::try_from(graph_id_str.to_string())
1649                .map_err(|err| ApiError::bad_request(err.to_string()))?;
1650            let key = GraphKey::cluster(graph_id.clone());
1651            match registry.get(&key) {
1652                RegistryLookup::Ready(handle) => handle,
1653                RegistryLookup::Gone => {
1654                    return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
1655                }
1656            }
1657        }
1658    };
1659
1660    // Per-request observability. `Span::current().record` would silently
1661    // no-op here because no upstream `#[tracing::instrument(...)]` macro
1662    // declares a `graph_id` field; emit an explicit event instead so the
1663    // routing decision actually lands in logs.
1664    info!(graph_id = %handle.key.graph_id, "graph routed");
1665
1666    request.extensions_mut().insert(handle);
1667    Ok(next.run(request).await)
1668}
1669
1670fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
1671    info!(
1672        actor_id = actor_id,
1673        action = %request.action,
1674        branch = request.branch.as_deref().unwrap_or(""),
1675        target_branch = request.target_branch.as_deref().unwrap_or(""),
1676        allowed = decision.allowed,
1677        matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
1678        "policy decision"
1679    );
1680}
1681
1682/// The allow/deny **decision** an authorization check produces, kept
1683/// separate from the operational failures (`Err`) that can occur while
1684/// computing it. [`authorize_request`] collapses `Denied` to a 403; a caller
1685/// that needs to remap a denial without also remapping operational failures
1686/// (the stored-query invoke handler hides a denial as a 404) matches on this
1687/// directly, so a real 401 (missing bearer) or 500 (policy-evaluation error)
1688/// keeps its true status instead of being masked as the denial's response.
1689enum Authz {
1690    Allowed,
1691    Denied(String),
1692}
1693
1694/// HTTP-layer Cedar policy gate, returning the allow/deny [`Authz`] decision
1695/// and reserving `Err` for operational failures (401 missing bearer, 500
1696/// policy-evaluation error). Two sources of the policy engine:
1697///   * Per-graph handler — passes `handle.policy.as_deref()` so the
1698///     graph's Cedar rules govern read/change/branch_*/schema_apply.
1699///   * Management handler — passes `state.server_policy.as_deref()` so
1700///     server-level Cedar rules govern `graph_list` (the only shipped
1701///     server-scoped action; runtime `graph_create` / `graph_delete`
1702///     are deferred until a managed cluster catalog lands).
1703///
1704/// The MR-731 invariant lives inside this function: actor identity is
1705/// supplied as a separate argument from the resolved bearer match. The
1706/// `PolicyRequest` struct itself does not carry identity (the field was
1707/// dropped from the type), so handlers cannot smuggle it through the
1708/// request. See `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
1709/// at `tests/server.rs`.
1710fn authorize(
1711    actor: Option<&ResolvedActor>,
1712    policy: Option<&PolicyEngine>,
1713    request: PolicyRequest,
1714) -> std::result::Result<Authz, ApiError> {
1715    let Some(engine) = policy else {
1716        // No PolicyEngine installed. Three runtime states can reach this:
1717        //
1718        // * **Open mode** (`--unauthenticated`): no tokens, no policy.
1719        //   Per-graph operations are open by operator opt-in (they
1720        //   accepted "trust the network" for graph data).
1721        // * **DefaultDeny mode**: tokens configured but no policy. The
1722        //   request went through bearer auth, so `actor` is Some. Only
1723        //   per-graph `Read` is permitted; other per-graph actions
1724        //   return 403. Closes the "configured auth but forgot the
1725        //   policy file" trap from MR-723.
1726        // * Either of the above with a **server-scoped** action
1727        //   (`graph_list`, future `graph_create`/`graph_delete`).
1728        //
1729        // Server-scoped actions are always denied here, regardless of
1730        // mode or actor presence. The management surface leaks server
1731        // topology (graph IDs + URIs that may contain S3 bucket paths
1732        // or internal hostnames) — operators who opted into Open mode
1733        // accepted exposure of graph DATA, not exposure of server
1734        // topology. Closing the management surface by default in every
1735        // runtime state means the docstring contract on
1736        // `server_graphs_list` ("don't leak the registry until the
1737        // operator explicitly authorizes it") holds uniformly; the
1738        // operator's only path to enabling it is configuring an
1739        // explicit `server.policy.file` in omnigraph.yaml.
1740        if request.action.resource_kind() == PolicyResourceKind::Server {
1741            return Ok(Authz::Denied(
1742                "server-scoped actions require an explicit `server.policy.file` \
1743                 configured in omnigraph.yaml — the management surface is closed \
1744                 by default in every runtime state, including --unauthenticated, \
1745                 so that server topology is never exposed without operator opt-in."
1746                    .to_string(),
1747            ));
1748        }
1749        if actor.is_some() && request.action != PolicyAction::Read {
1750            return Ok(Authz::Denied(
1751                "server runs in default-deny mode (bearer tokens configured but no \
1752                 policy file). Only `read` actions are permitted; configure \
1753                 `policy.file` in omnigraph.yaml to enable other actions."
1754                    .to_string(),
1755            ));
1756        }
1757        return Ok(Authz::Allowed);
1758    };
1759    let Some(actor) = actor else {
1760        return Err(ApiError::unauthorized("missing bearer token"));
1761    };
1762    // SECURITY INVARIANT (MR-731): actor identity is supplied to the
1763    // policy engine here as a separate argument, sourced from the
1764    // bearer-token match resolved by `require_bearer_auth`. The
1765    // `PolicyRequest` struct itself no longer carries `actor_id` (it
1766    // was dropped from the type), so handlers cannot smuggle identity
1767    // through the request body and there is no overwrite step that
1768    // could be skipped. The principle is codified in
1769    // `docs/dev/invariants.md` Hard Invariant 11 ("clients cannot set
1770    // actor identity directly") and pinned by the regression test
1771    // `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
1772    // in `crates/omnigraph-server/tests/server.rs`.
1773    let actor_id = actor.actor_id.as_ref();
1774    let decision = engine
1775        .authorize(actor_id, &request)
1776        .map_err(|err| ApiError::internal(format!("policy: {err}")))?;
1777    log_policy_decision(actor_id, &request, &decision);
1778    if decision.allowed {
1779        Ok(Authz::Allowed)
1780    } else {
1781        Ok(Authz::Denied(decision.message))
1782    }
1783}
1784
1785/// Thin wrapper over [`authorize`] for the handlers that treat any denial as a
1786/// 403: a denial becomes `ApiError::forbidden`, and operational failures
1787/// (401 missing bearer, 500 policy-evaluation error) propagate unchanged. The
1788/// stored-query invoke handler does **not** use this — it consumes the
1789/// [`Authz`] decision directly to hide a denial as a 404 while letting an
1790/// operational failure keep its true status.
1791fn authorize_request(
1792    actor: Option<&ResolvedActor>,
1793    policy: Option<&PolicyEngine>,
1794    request: PolicyRequest,
1795) -> std::result::Result<(), ApiError> {
1796    match authorize(actor, policy, request)? {
1797        Authz::Allowed => Ok(()),
1798        Authz::Denied(message) => Err(ApiError::forbidden(message)),
1799    }
1800}
1801
1802#[utoipa::path(
1803    get,
1804    path = "/snapshot",
1805    tag = "snapshots",
1806    operation_id = "getSnapshot",
1807    params(SnapshotQuery),
1808    responses(
1809        (status = 200, description = "Database snapshot", body = api::SnapshotOutput),
1810        (status = 401, description = "Unauthorized", body = ErrorOutput),
1811        (status = 403, description = "Forbidden", body = ErrorOutput),
1812    ),
1813    security(("bearer_token" = [])),
1814)]
1815/// Read the current snapshot of a branch.
1816///
1817/// Returns the manifest version plus per-table metadata (path, version, row
1818/// count) for every table on the branch. Defaults to `main` when `branch` is
1819/// omitted. Read-only.
1820async fn server_snapshot(
1821    Extension(handle): Extension<Arc<GraphHandle>>,
1822    actor: Option<Extension<ResolvedActor>>,
1823    Query(query): Query<SnapshotQuery>,
1824) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
1825    let branch = query.branch.unwrap_or_else(|| "main".to_string());
1826    authorize_request(
1827        actor.as_ref().map(|Extension(actor)| actor),
1828        handle.policy.as_deref(),
1829        PolicyRequest {
1830            action: PolicyAction::Read,
1831            branch: Some(branch.clone()),
1832            target_branch: None,
1833        },
1834    )?;
1835    let snapshot = {
1836        let db = &handle.engine;
1837        db.snapshot_of(ReadTarget::branch(branch.as_str()))
1838            .await
1839            .map_err(ApiError::from_omni)?
1840    };
1841    Ok(Json(snapshot_payload(&branch, &snapshot)))
1842}
1843
1844/// Header values that flag a response as coming from a deprecated route
1845/// (RFC 9745 / RFC 8288) and point at the canonical successor.
1846fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValue); 2] {
1847    [
1848        (
1849            HeaderName::from_static("deprecation"),
1850            HeaderValue::from_static("true"),
1851        ),
1852        (
1853            HeaderName::from_static("link"),
1854            HeaderValue::from_static(successor_link),
1855        ),
1856    ]
1857}
1858
1859#[utoipa::path(
1860    post,
1861    path = "/read",
1862    tag = "queries",
1863    operation_id = "read",
1864    request_body = ReadRequest,
1865    responses(
1866        (status = 200, description = "Query results (response includes `Deprecation: true` + `Link: </query>; rel=\"successor-version\"`)", body = ReadOutput),
1867        (status = 400, description = "Bad request", body = ErrorOutput),
1868        (status = 401, description = "Unauthorized", body = ErrorOutput),
1869        (status = 403, description = "Forbidden", body = ErrorOutput),
1870    ),
1871    security(("bearer_token" = [])),
1872)]
1873#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
1874/// **Deprecated** — use [`POST /query`](#tag/queries/operation/query) instead.
1875///
1876/// Execute a GQ read query. Behavior is unchanged from prior releases; the
1877/// route is kept indefinitely for byte-stable back-compat. New integrations
1878/// should target `POST /query`, which has clean field names (`query` /
1879/// `name`) and a 400-on-mutation guard. Responses from this route include
1880/// `Deprecation: true` and `Link: </query>; rel="successor-version"`
1881/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
1882/// signal.
1883async fn server_read(
1884    Extension(handle): Extension<Arc<GraphHandle>>,
1885    actor: Option<Extension<ResolvedActor>>,
1886    Json(request): Json<ReadRequest>,
1887) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ReadOutput>), ApiError> {
1888    let (selected_name, target, result) = run_query(
1889        handle,
1890        actor.as_ref().map(|Extension(actor)| actor),
1891        &request.query_source,
1892        request.query_name.as_deref(),
1893        request.params.as_ref(),
1894        request.branch,
1895        request.snapshot,
1896        false, // /read predates the D2 rule; legacy callers may submit mutating queries here
1897    )
1898    .await?;
1899    Ok((
1900        deprecation_headers("</query>; rel=\"successor-version\""),
1901        Json(api::read_output(selected_name, &target, result)),
1902    ))
1903}
1904
1905#[utoipa::path(
1906    post,
1907    path = "/query",
1908    tag = "queries",
1909    operation_id = "query",
1910    request_body = QueryRequest,
1911    responses(
1912        (status = 200, description = "Query results", body = ReadOutput),
1913        (status = 400, description = "Bad request - also returned when the query body contains mutations; use POST /mutate (or its deprecated alias POST /change) for write queries", body = ErrorOutput),
1914        (status = 401, description = "Unauthorized", body = ErrorOutput),
1915        (status = 403, description = "Forbidden", body = ErrorOutput),
1916    ),
1917    security(("bearer_token" = [])),
1918)]
1919/// Execute an inline read query (friendlier-named alternative to `POST /read`).
1920///
1921/// Designed for ad-hoc exploration and AI-agent tool-use: short field
1922/// names (`query`, `name`) match the CLI `-e` flag and the GQ `query`
1923/// keyword. Mutations (`insert`/`update`/`delete`) are rejected with 400
1924/// -- use `POST /mutate` (or its deprecated alias `POST /change`) for
1925/// write queries. Otherwise behaves identically to `POST /read`: same
1926/// target semantics (branch xor snapshot), same Cedar action (Read),
1927/// same response shape.
1928async fn server_query(
1929    Extension(handle): Extension<Arc<GraphHandle>>,
1930    actor: Option<Extension<ResolvedActor>>,
1931    Json(request): Json<QueryRequest>,
1932) -> std::result::Result<Json<ReadOutput>, ApiError> {
1933    let (selected_name, target, result) = run_query(
1934        handle,
1935        actor.as_ref().map(|Extension(actor)| actor),
1936        &request.query,
1937        request.name.as_deref(),
1938        request.params.as_ref(),
1939        request.branch,
1940        request.snapshot,
1941        true, // /query is read-only; reject mutations
1942    )
1943    .await?;
1944    Ok(Json(api::read_output(selected_name, &target, result)))
1945}
1946
1947#[utoipa::path(
1948    post,
1949    path = "/export",
1950    tag = "queries",
1951    operation_id = "export",
1952    request_body = ExportRequest,
1953    responses(
1954        (status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
1955        (status = 400, description = "Bad request", body = ErrorOutput),
1956        (status = 401, description = "Unauthorized", body = ErrorOutput),
1957        (status = 403, description = "Forbidden", body = ErrorOutput),
1958    ),
1959    security(("bearer_token" = [])),
1960)]
1961/// Stream the contents of a branch as NDJSON.
1962///
1963/// Emits one JSON object per line (`application/x-ndjson`). Filter with
1964/// `type_names` (node/edge type names) and/or `table_keys`; both empty
1965/// streams the entire branch. Suitable for large exports — the response is
1966/// streamed, not buffered. Read-only.
1967async fn server_export(
1968    Extension(handle): Extension<Arc<GraphHandle>>,
1969    actor: Option<Extension<ResolvedActor>>,
1970    Json(request): Json<ExportRequest>,
1971) -> std::result::Result<Response, ApiError> {
1972    let branch = request.branch.unwrap_or_else(|| "main".to_string());
1973    authorize_request(
1974        actor.as_ref().map(|Extension(actor)| actor),
1975        handle.policy.as_deref(),
1976        PolicyRequest {
1977            action: PolicyAction::Export,
1978            branch: Some(branch.clone()),
1979            target_branch: None,
1980        },
1981    )?;
1982    let engine = Arc::clone(&handle.engine);
1983    let type_names = request.type_names.clone();
1984    let table_keys = request.table_keys.clone();
1985    let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
1986    tokio::spawn(async move {
1987        let result = {
1988            let mut writer = ExportStreamWriter { sender: tx.clone() };
1989            engine
1990                .export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
1991                .await
1992        };
1993        if let Err(err) = result {
1994            let _ = tx.send(Err(io::Error::other(err.to_string())));
1995        }
1996    });
1997    let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
1998        rx.recv().await.map(|item| (item, rx))
1999    }));
2000    Ok((
2001        StatusCode::OK,
2002        [(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
2003        body,
2004    )
2005        .into_response())
2006}
2007
2008/// Shared implementation behind `POST /mutate` (canonical) and
2009/// `POST /change` (deprecated alias). Returns the bare `ChangeOutput`;
2010/// each route handler wraps it (the alias also attaches Deprecation
2011/// headers).
2012/// Shared backend for `/mutate` (canonical) and `/change` (deprecated alias).
2013///
2014/// Decoupled from `ChangeRequest` so MR-969's `/queries/{name}` stored-query
2015/// handler can call this directly with registry-supplied fields without
2016/// rebuilding the request body. Today's HTTP handlers unpack the request and
2017/// call here; the registry would do the same.
2018async fn run_mutate(
2019    state: AppState,
2020    handle: Arc<GraphHandle>,
2021    actor: Option<&ResolvedActor>,
2022    query: &str,
2023    name: Option<&str>,
2024    params_json: Option<&Value>,
2025    branch: String,
2026) -> std::result::Result<ChangeOutput, ApiError> {
2027    let actor_arc = actor
2028        .map(|a| Arc::clone(&a.actor_id))
2029        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2030    let actor_id = actor.map(|a| a.actor_id.as_ref());
2031    authorize_request(
2032        actor,
2033        handle.policy.as_deref(),
2034        PolicyRequest {
2035            action: PolicyAction::Change,
2036            branch: Some(branch.clone()),
2037            target_branch: None,
2038        },
2039    )?;
2040    // Per-actor admission: bound concurrent in-flight mutations and
2041    // estimated bytes per actor. Cedar runs FIRST so denied requests
2042    // don't consume admission slots. Estimate uses the request body
2043    // size as a coarse proxy; engine memory pressure can run higher.
2044    let est_bytes = query.len() as u64
2045        + params_json
2046            .map(|p| p.to_string().len() as u64)
2047            .unwrap_or(0);
2048    let _admission = state
2049        .workload
2050        .try_admit(&actor_arc, est_bytes)
2051        .map_err(ApiError::from_workload_reject)?;
2052    let (selected_name, query_params) =
2053        select_named_query(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
2054    let params = query_params_from_json(&query_params, params_json)
2055        .map_err(|err| ApiError::bad_request(err.to_string()))?;
2056
2057    let result = {
2058        let db = &handle.engine;
2059        db.mutate_as(&branch, query, &selected_name, &params, actor_id)
2060            .await
2061            .map_err(ApiError::from_omni)?
2062    };
2063    Ok(ChangeOutput {
2064        branch,
2065        query_name: selected_name,
2066        affected_nodes: result.affected_nodes,
2067        affected_edges: result.affected_edges,
2068        actor_id: actor_id.map(str::to_string),
2069    })
2070}
2071
2072/// Shared backend for `/query` (canonical) and `/read` (deprecated alias).
2073///
2074/// Mirrors [`run_mutate`]'s decoupled shape so MR-969's stored-query handler
2075/// can call here with registry-supplied fields. Rejects inline source that
2076/// contains mutations (D2 rule); callers wanting writes go through
2077/// [`run_mutate`] instead.
2078///
2079/// Intentionally does **not** take [`AppState`] (unlike [`run_mutate`]):
2080/// reads are not admission-gated today, so there is no `state.workload`
2081/// consumer. The signature grows the parameter when Phase 1 (MR-976) adds
2082/// the request envelope's `expect: { max_rows_scanned: N }` budget, or
2083/// MR-969 extends per-actor admission to stored-read invocations.
2084async fn run_query(
2085    handle: Arc<GraphHandle>,
2086    actor: Option<&ResolvedActor>,
2087    query: &str,
2088    name: Option<&str>,
2089    params_json: Option<&Value>,
2090    branch: Option<String>,
2091    snapshot: Option<String>,
2092    reject_mutations: bool,
2093) -> std::result::Result<(String, ReadTarget, omnigraph_compiler::result::QueryResult), ApiError> {
2094    if branch.is_some() && snapshot.is_some() {
2095        return Err(ApiError::bad_request(
2096            "request may specify branch or snapshot, not both",
2097        ));
2098    }
2099
2100    let target = read_target_from_request(branch, snapshot);
2101    let policy_branch = match &target {
2102        ReadTarget::Branch(branch) => Some(branch.clone()),
2103        ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
2104            let db = &handle.engine;
2105            db.resolved_branch_of(target.clone())
2106                .await
2107                .map(|branch| branch.or_else(|| Some("main".to_string())))
2108                .map_err(ApiError::from_omni)?
2109        }
2110        ReadTarget::Snapshot(_) => None,
2111    };
2112    authorize_request(
2113        actor,
2114        handle.policy.as_deref(),
2115        PolicyRequest {
2116            action: PolicyAction::Read,
2117            branch: policy_branch,
2118            target_branch: None,
2119        },
2120    )?;
2121    let query_decl =
2122        select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
2123    if reject_mutations && !query_decl.mutations.is_empty() {
2124        return Err(ApiError::bad_request(format!(
2125            "query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
2126            query_decl.name
2127        )));
2128    }
2129    let selected_name = query_decl.name.clone();
2130    let params = query_params_from_json(&query_decl.params, params_json)
2131        .map_err(|err| ApiError::bad_request(err.to_string()))?;
2132
2133    let result = {
2134        let db = &handle.engine;
2135        db.query(target.clone(), query, &selected_name, &params)
2136            .await
2137            .map_err(ApiError::from_omni)?
2138    };
2139    Ok((selected_name, target, result))
2140}
2141
2142#[utoipa::path(
2143    post,
2144    path = "/change",
2145    tag = "mutations",
2146    operation_id = "change",
2147    request_body = ChangeRequest,
2148    responses(
2149        (status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: </mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
2150        (status = 400, description = "Bad request", body = ErrorOutput),
2151        (status = 401, description = "Unauthorized", body = ErrorOutput),
2152        (status = 403, description = "Forbidden", body = ErrorOutput),
2153        (status = 409, description = "Merge conflict", body = ErrorOutput),
2154        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2155    ),
2156    security(("bearer_token" = [])),
2157)]
2158#[deprecated(note = "use POST /mutate instead; /change is kept indefinitely for back-compat")]
2159/// **Deprecated** — use [`POST /mutate`](#tag/mutations/operation/mutate) instead.
2160///
2161/// Apply a GQ mutation to a branch. Behavior is unchanged; the route is
2162/// kept indefinitely for back-compat. New integrations should target
2163/// `POST /mutate`, which has identical semantics and a name that pairs
2164/// cleanly with `POST /query`. Responses from this route include
2165/// `Deprecation: true` and `Link: </mutate>; rel="successor-version"`
2166/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
2167/// signal.
2168async fn server_change(
2169    State(state): State<AppState>,
2170    Extension(handle): Extension<Arc<GraphHandle>>,
2171    actor: Option<Extension<ResolvedActor>>,
2172    Json(request): Json<ChangeRequest>,
2173) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ChangeOutput>), ApiError> {
2174    let branch = request.branch.unwrap_or_else(|| "main".to_string());
2175    let output = run_mutate(
2176        state,
2177        handle,
2178        actor.as_ref().map(|Extension(actor)| actor),
2179        &request.query,
2180        request.name.as_deref(),
2181        request.params.as_ref(),
2182        branch,
2183    )
2184    .await?;
2185    Ok((
2186        deprecation_headers("</mutate>; rel=\"successor-version\""),
2187        Json(output),
2188    ))
2189}
2190
2191#[utoipa::path(
2192    post,
2193    path = "/mutate",
2194    tag = "mutations",
2195    operation_id = "mutate",
2196    request_body = ChangeRequest,
2197    responses(
2198        (status = 200, description = "Mutation results", body = ChangeOutput),
2199        (status = 400, description = "Bad request", body = ErrorOutput),
2200        (status = 401, description = "Unauthorized", body = ErrorOutput),
2201        (status = 403, description = "Forbidden", body = ErrorOutput),
2202        (status = 409, description = "Merge conflict", body = ErrorOutput),
2203        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2204    ),
2205    security(("bearer_token" = [])),
2206)]
2207/// Apply a GQ mutation to a branch (canonical mutation endpoint).
2208///
2209/// Writes to the named `branch` (defaults to `main`). Mutations are atomic
2210/// per call and produce a new commit. Returns counts of nodes and edges
2211/// affected. **Destructive**: on success the branch is updated; rejected
2212/// mutations may still acquire locks briefly. Returns 409 on merge conflict.
2213///
2214/// Pairs with `POST /query` (read-only). The legacy `POST /change` route
2215/// has identical semantics and is kept as a deprecated alias.
2216async fn server_mutate(
2217    State(state): State<AppState>,
2218    Extension(handle): Extension<Arc<GraphHandle>>,
2219    actor: Option<Extension<ResolvedActor>>,
2220    Json(request): Json<ChangeRequest>,
2221) -> std::result::Result<Json<ChangeOutput>, ApiError> {
2222    let branch = request.branch.unwrap_or_else(|| "main".to_string());
2223    Ok(Json(
2224        run_mutate(
2225            state,
2226            handle,
2227            actor.as_ref().map(|Extension(actor)| actor),
2228            &request.query,
2229            request.name.as_deref(),
2230            request.params.as_ref(),
2231            branch,
2232        )
2233        .await?,
2234    ))
2235}
2236
2237/// Path parameter for `POST /queries/{name}`.
2238#[derive(Deserialize)]
2239struct QueryNamePath {
2240    name: String,
2241}
2242
2243fn parse_optional_invoke_body(
2244    body: Bytes,
2245) -> std::result::Result<InvokeStoredQueryRequest, ApiError> {
2246    if body.is_empty() {
2247        return Ok(InvokeStoredQueryRequest::default());
2248    }
2249    serde_json::from_slice::<Option<InvokeStoredQueryRequest>>(&body)
2250        .map(|request| request.unwrap_or_default())
2251        .map_err(|err| {
2252            ApiError::bad_request(format!("invalid stored-query invocation body: {err}"))
2253        })
2254}
2255
2256#[utoipa::path(
2257    post,
2258    path = "/queries/{name}",
2259    tag = "queries",
2260    operation_id = "invoke_query",
2261    params(("name" = String, Path, description = "Stored query name (the registry key)")),
2262    request_body = Option<InvokeStoredQueryRequest>,
2263    responses(
2264        (status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse),
2265        (status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput),
2266        (status = 401, description = "Unauthorized", body = ErrorOutput),
2267        (status = 403, description = "Forbidden (the inner `change` gate for a stored mutation)", body = ErrorOutput),
2268        (status = 404, description = "Unknown stored query, or `invoke_query` denied — indistinguishable to a caller without the grant", body = ErrorOutput),
2269        (status = 409, description = "Merge conflict", body = ErrorOutput),
2270        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2271        (status = 500, description = "Policy evaluation error (a denial is reported as 404, not 500)", body = ErrorOutput),
2272    ),
2273    security(("bearer_token" = [])),
2274)]
2275/// Invoke a curated, server-side stored query by name.
2276///
2277/// The query source comes from the graph's `queries:` registry, not the
2278/// request body — callers send only runtime inputs (`params`, `branch`,
2279/// `snapshot`). Gated by the `invoke_query` Cedar action at the boundary;
2280/// a stored *mutation* additionally passes the engine's `change` gate
2281/// (double-gated). An actor **without** `invoke_query` cannot tell a denied
2282/// query from a missing one — both return the same 404, so the catalog
2283/// can't be probed without the grant. Once `invoke_query` is held, the
2284/// inner `read`/`change` gate may surface a 403 for an existing query the
2285/// actor can't run (the intended double-gate signal).
2286async fn server_invoke_query(
2287    State(state): State<AppState>,
2288    Extension(handle): Extension<Arc<GraphHandle>>,
2289    actor: Option<Extension<ResolvedActor>>,
2290    Path(QueryNamePath { name }): Path<QueryNamePath>,
2291    body: Bytes,
2292) -> std::result::Result<Json<InvokeStoredQueryResponse>, ApiError> {
2293    let req = parse_optional_invoke_body(body)?;
2294    // A caller without `invoke_query` can't tell a denial from a missing
2295    // query: both 404 with this exact message, so the catalog can't be
2296    // probed without the grant. (A caller that holds invoke_query may still
2297    // see the inner gate's 403 for an existing query it can't run — intended.)
2298    const NOT_FOUND: &str = "stored query not found";
2299    let actor_ref = actor.as_ref().map(|Extension(actor)| actor);
2300
2301    // Boundary gate (authentication already ran in `require_bearer_auth`).
2302    // A denial is hidden as 404 (deny == missing, so the catalog can't be
2303    // probed without the grant), but operational failures (401 missing bearer,
2304    // 500 policy-evaluation error) propagate with their true status via `?`
2305    // rather than being masked as a missing query.
2306    match authorize(
2307        actor_ref,
2308        handle.policy.as_deref(),
2309        PolicyRequest {
2310            action: PolicyAction::InvokeQuery,
2311            // Graph-scoped: no branch dimension. The per-branch/snapshot
2312            // access is enforced by the inner read/change gate in the
2313            // runner, so the outer gate must not resolve a branch (doing so
2314            // was wrong for snapshot reads).
2315            branch: None,
2316            target_branch: None,
2317        },
2318    )? {
2319        Authz::Allowed => {}
2320        Authz::Denied(_) => return Err(ApiError::not_found(NOT_FOUND)),
2321    }
2322
2323    // Resolve against the per-graph registry (same 404 on a miss).
2324    let stored = handle
2325        .queries
2326        .as_ref()
2327        .and_then(|registry| registry.lookup(&name))
2328        .ok_or_else(|| ApiError::not_found(NOT_FOUND))?;
2329
2330    // Detach what we need before `handle` moves into the runner — the
2331    // registry borrow lives inside `handle`.
2332    let source = Arc::clone(&stored.source);
2333    let query_name = stored.name.clone();
2334    let is_mutation = stored.is_mutation();
2335
2336    info!(
2337        graph = %handle.uri,
2338        actor = ?actor_ref.map(|a| a.actor_id.as_ref()),
2339        query = %query_name,
2340        kind = if is_mutation { "mutate" } else { "read" },
2341        "stored query invoked"
2342    );
2343
2344    if is_mutation {
2345        if req.snapshot.is_some() {
2346            return Err(ApiError::bad_request(
2347                "stored mutation cannot target a snapshot",
2348            ));
2349        }
2350        let branch = req.branch.unwrap_or_else(|| "main".to_string());
2351        let output = run_mutate(
2352            state,
2353            handle,
2354            actor_ref,
2355            &source,
2356            Some(&query_name),
2357            req.params.as_ref(),
2358            branch,
2359        )
2360        .await?;
2361        Ok(Json(InvokeStoredQueryResponse::Change(output)))
2362    } else {
2363        let (selected, target, result) = run_query(
2364            handle,
2365            actor_ref,
2366            &source,
2367            Some(&query_name),
2368            req.params.as_ref(),
2369            req.branch,
2370            req.snapshot,
2371            true,
2372        )
2373        .await?;
2374        Ok(Json(InvokeStoredQueryResponse::Read(api::read_output(
2375            selected, &target, result,
2376        ))))
2377    }
2378}
2379
2380#[utoipa::path(
2381    get,
2382    path = "/queries",
2383    tag = "queries",
2384    operation_id = "list_queries",
2385    responses(
2386        (status = 200, description = "Stored-query catalog (the mcp.expose subset, with typed params)", body = QueriesCatalogOutput),
2387        (status = 401, description = "Unauthorized", body = ErrorOutput),
2388        (status = 403, description = "Forbidden", body = ErrorOutput),
2389    ),
2390    security(("bearer_token" = [])),
2391)]
2392/// List the graph's exposed stored queries as a typed tool catalog.
2393///
2394/// Returns the `mcp.expose == true` subset of the `queries:` registry, each
2395/// with its MCP tool name, read/mutate flag, description/instruction, and
2396/// typed parameters — enough for a client to register them as tools without
2397/// fetching `.gq` source. Read-gated; the catalog is graph-wide (branch
2398/// independent — `read` is authorized against `main`). **Not** Cedar-filtered
2399/// per query yet, so it can list a query whose `invoke_query` the caller
2400/// lacks (a known gap until per-query authorization lands).
2401async fn server_list_queries(
2402    Extension(handle): Extension<Arc<GraphHandle>>,
2403    actor: Option<Extension<ResolvedActor>>,
2404) -> std::result::Result<Json<QueriesCatalogOutput>, ApiError> {
2405    authorize_request(
2406        actor.as_ref().map(|Extension(actor)| actor),
2407        handle.policy.as_deref(),
2408        PolicyRequest {
2409            action: PolicyAction::Read,
2410            branch: Some("main".to_string()),
2411            target_branch: None,
2412        },
2413    )?;
2414    let queries = match handle.queries.as_ref() {
2415        Some(registry) => registry
2416            .iter()
2417            .filter(|q| q.expose)
2418            .map(api::query_catalog_entry)
2419            .collect(),
2420        None => Vec::new(),
2421    };
2422    Ok(Json(QueriesCatalogOutput { queries }))
2423}
2424
2425#[utoipa::path(
2426    get,
2427    path = "/schema",
2428    tag = "schema",
2429    operation_id = "getSchema",
2430    responses(
2431        (status = 200, description = "Current schema source", body = SchemaOutput),
2432        (status = 401, description = "Unauthorized", body = ErrorOutput),
2433        (status = 403, description = "Forbidden", body = ErrorOutput),
2434    ),
2435    security(("bearer_token" = [])),
2436)]
2437/// Read the current schema source.
2438///
2439/// Returns the project's schema as a single string in `.pg` source form.
2440/// Useful for clients that want to introspect available types and tables
2441/// before constructing GQ queries. Read-only.
2442async fn server_schema_get(
2443    Extension(handle): Extension<Arc<GraphHandle>>,
2444    actor: Option<Extension<ResolvedActor>>,
2445) -> std::result::Result<Json<SchemaOutput>, ApiError> {
2446    authorize_request(
2447        actor.as_ref().map(|Extension(actor)| actor),
2448        handle.policy.as_deref(),
2449        PolicyRequest {
2450            action: PolicyAction::Read,
2451            branch: None,
2452            target_branch: None,
2453        },
2454    )?;
2455    let schema_source = {
2456        let db = &handle.engine;
2457        db.schema_source().to_string()
2458    };
2459    Ok(Json(SchemaOutput { schema_source }))
2460}
2461
2462#[utoipa::path(
2463    post,
2464    path = "/schema/apply",
2465    tag = "mutations",
2466    operation_id = "applySchema",
2467    request_body = SchemaApplyRequest,
2468    responses(
2469        (status = 200, description = "Schema apply results", body = SchemaApplyOutput),
2470        (status = 400, description = "Bad request", body = ErrorOutput),
2471        (status = 401, description = "Unauthorized", body = ErrorOutput),
2472        (status = 403, description = "Forbidden", body = ErrorOutput),
2473        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2474    ),
2475    security(("bearer_token" = [])),
2476)]
2477/// Apply a schema migration.
2478///
2479/// Diffs `schema_source` against the current schema and applies the resulting
2480/// migration steps (add/drop type, add/drop column, etc.). **Destructive**:
2481/// some steps drop data. Returns the list of steps applied; if `applied` is
2482/// false the diff was unsupported and no changes were made.
2483async fn server_schema_apply(
2484    State(state): State<AppState>,
2485    Extension(handle): Extension<Arc<GraphHandle>>,
2486    actor: Option<Extension<ResolvedActor>>,
2487    Json(request): Json<SchemaApplyRequest>,
2488) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
2489    let actor_arc = actor
2490        .as_ref()
2491        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2492        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2493    let actor_id = actor
2494        .as_ref()
2495        .map(|Extension(actor)| actor.actor_id.as_ref());
2496    authorize_request(
2497        actor.as_ref().map(|Extension(actor)| actor),
2498        handle.policy.as_deref(),
2499        PolicyRequest {
2500            action: PolicyAction::SchemaApply,
2501            branch: None,
2502            target_branch: Some("main".to_string()),
2503        },
2504    )?;
2505    let est_bytes = request.schema_source.len() as u64;
2506    let _admission = state
2507        .workload
2508        .try_admit(&actor_arc, est_bytes)
2509        .map_err(ApiError::from_workload_reject)?;
2510    let result = {
2511        let db = &handle.engine;
2512        let registry = handle.queries.as_deref();
2513        let label = handle.key.graph_id.as_str().to_string();
2514        // Engine-layer policy enforcement (MR-722): pass the resolved
2515        // actor through so apply_schema_as can call enforce() with the
2516        // authoritative identity. With a policy installed in AppState,
2517        // engine-side enforcement re-checks the same decision the
2518        // HTTP-layer authorize_request just made above. PR #3 collapses
2519        // the redundancy.
2520        db.apply_schema_as_with_catalog_check(
2521            &request.schema_source,
2522            omnigraph::db::SchemaApplyOptions {
2523                allow_data_loss: request.allow_data_loss,
2524            },
2525            actor_id,
2526            |catalog| {
2527                if let Some(registry) = registry {
2528                    validate_registry_against_catalog(registry, catalog, &label)?;
2529                }
2530                Ok(())
2531            },
2532        )
2533        .await
2534        .map_err(ApiError::from_omni)?
2535    };
2536    Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
2537}
2538
2539#[utoipa::path(
2540    post,
2541    path = "/ingest",
2542    tag = "mutations",
2543    operation_id = "ingest",
2544    request_body = IngestRequest,
2545    responses(
2546        (status = 200, description = "Ingest results", body = IngestOutput),
2547        (status = 400, description = "Bad request", body = ErrorOutput),
2548        (status = 401, description = "Unauthorized", body = ErrorOutput),
2549        (status = 403, description = "Forbidden", body = ErrorOutput),
2550        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2551    ),
2552    security(("bearer_token" = [])),
2553)]
2554/// Bulk-ingest NDJSON data into a branch.
2555///
2556/// `data` is NDJSON with one record per line. `mode` controls behavior on
2557/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
2558/// `overwrite` replaces table contents. If `branch` does not exist it is
2559/// created from `from` (defaults to `main`). **Destructive** when `mode` is
2560/// `overwrite` or when ingest produces conflicting writes.
2561async fn server_ingest(
2562    State(state): State<AppState>,
2563    Extension(handle): Extension<Arc<GraphHandle>>,
2564    actor: Option<Extension<ResolvedActor>>,
2565    Json(request): Json<IngestRequest>,
2566) -> std::result::Result<Json<IngestOutput>, ApiError> {
2567    let branch = request.branch.unwrap_or_else(|| "main".to_string());
2568    let from = request.from.unwrap_or_else(|| "main".to_string());
2569    let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
2570    let actor_arc = actor
2571        .as_ref()
2572        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2573        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2574    let actor_id = actor
2575        .as_ref()
2576        .map(|Extension(actor)| actor.actor_id.as_ref());
2577
2578    let branch_exists = {
2579        let db = &handle.engine;
2580        db.branch_list()
2581            .await
2582            .map_err(ApiError::from_omni)?
2583            .into_iter()
2584            .any(|name| name == branch)
2585    };
2586
2587    if !branch_exists {
2588        authorize_request(
2589            actor.as_ref().map(|Extension(actor)| actor),
2590            handle.policy.as_deref(),
2591            PolicyRequest {
2592                action: PolicyAction::BranchCreate,
2593                branch: Some(from.clone()),
2594                target_branch: Some(branch.clone()),
2595            },
2596        )?;
2597    }
2598    authorize_request(
2599        actor.as_ref().map(|Extension(actor)| actor),
2600        handle.policy.as_deref(),
2601        PolicyRequest {
2602            action: PolicyAction::Change,
2603            branch: Some(branch.clone()),
2604            target_branch: None,
2605        },
2606    )?;
2607    let est_bytes = request.data.len() as u64;
2608    let _admission = state
2609        .workload
2610        .try_admit(&actor_arc, est_bytes)
2611        .map_err(ApiError::from_workload_reject)?;
2612
2613    let result = {
2614        let db = &handle.engine;
2615        db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
2616            .await
2617            .map_err(ApiError::from_omni)?
2618    };
2619
2620    Ok(Json(ingest_output(
2621        handle.uri.as_str(),
2622        &result,
2623        actor_id.map(str::to_string),
2624    )))
2625}
2626
2627#[utoipa::path(
2628    get,
2629    path = "/branches",
2630    tag = "branches",
2631    operation_id = "listBranches",
2632    responses(
2633        (status = 200, description = "List of branches", body = BranchListOutput),
2634        (status = 401, description = "Unauthorized", body = ErrorOutput),
2635        (status = 403, description = "Forbidden", body = ErrorOutput),
2636    ),
2637    security(("bearer_token" = [])),
2638)]
2639/// List all branches.
2640///
2641/// Returns branch names sorted alphabetically. Read-only.
2642async fn server_branch_list(
2643    Extension(handle): Extension<Arc<GraphHandle>>,
2644    actor: Option<Extension<ResolvedActor>>,
2645) -> std::result::Result<Json<BranchListOutput>, ApiError> {
2646    authorize_request(
2647        actor.as_ref().map(|Extension(actor)| actor),
2648        handle.policy.as_deref(),
2649        PolicyRequest {
2650            action: PolicyAction::Read,
2651            branch: None,
2652            target_branch: None,
2653        },
2654    )?;
2655    let mut branches = {
2656        let db = &handle.engine;
2657        db.branch_list().await.map_err(ApiError::from_omni)?
2658    };
2659    branches.sort();
2660    Ok(Json(BranchListOutput { branches }))
2661}
2662
2663#[utoipa::path(
2664    post,
2665    path = "/branches",
2666    tag = "branches",
2667    operation_id = "createBranch",
2668    request_body = BranchCreateRequest,
2669    responses(
2670        (status = 200, description = "Branch created", body = BranchCreateOutput),
2671        (status = 400, description = "Bad request", body = ErrorOutput),
2672        (status = 401, description = "Unauthorized", body = ErrorOutput),
2673        (status = 403, description = "Forbidden", body = ErrorOutput),
2674        (status = 409, description = "Branch already exists", body = ErrorOutput),
2675        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2676    ),
2677    security(("bearer_token" = [])),
2678)]
2679/// Create a new branch.
2680///
2681/// Forks `name` off of `from` (defaults to `main`). The new branch shares
2682/// table data with its parent until it is mutated. Returns 409 if `name`
2683/// already exists.
2684async fn server_branch_create(
2685    State(state): State<AppState>,
2686    Extension(handle): Extension<Arc<GraphHandle>>,
2687    actor: Option<Extension<ResolvedActor>>,
2688    Json(request): Json<BranchCreateRequest>,
2689) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
2690    let from = request.from.unwrap_or_else(|| "main".to_string());
2691    let actor_arc = actor
2692        .as_ref()
2693        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2694        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2695    authorize_request(
2696        actor.as_ref().map(|Extension(actor)| actor),
2697        handle.policy.as_deref(),
2698        PolicyRequest {
2699            action: PolicyAction::BranchCreate,
2700            branch: Some(from.clone()),
2701            target_branch: Some(request.name.clone()),
2702        },
2703    )?;
2704    // Branch metadata only — small constant bytes estimate. The Lance
2705    // shallow-clone work is bounded by the parent's manifest size, not
2706    // the request body.
2707    let _admission = state
2708        .workload
2709        .try_admit(&actor_arc, 256)
2710        .map_err(ApiError::from_workload_reject)?;
2711    {
2712        let db = &handle.engine;
2713        db.branch_create_from_as(
2714            ReadTarget::branch(&from),
2715            &request.name,
2716            actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
2717        )
2718        .await
2719        .map_err(ApiError::from_omni)?;
2720    }
2721    Ok(Json(BranchCreateOutput {
2722        uri: handle.uri.clone(),
2723        from,
2724        name: request.name,
2725        actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
2726    }))
2727}
2728
2729/// Path-param shape for [`server_branch_delete`]. Named-field
2730/// deserialization (rather than `Path<String>` or `Path<(String,)>`)
2731/// keeps the extractor stable across single-mode flat routes and
2732/// multi-mode nested routes: the `{branch}` capture is picked by
2733/// name and any other captures in scope (e.g. `{graph_id}` in
2734/// multi-mode) are ignored without breaking deserialization.
2735///
2736/// Closes the "handler path-extractor type is positional and breaks
2737/// when route nesting changes" class.
2738#[derive(Deserialize)]
2739struct BranchPath {
2740    branch: String,
2741}
2742
2743#[utoipa::path(
2744    delete,
2745    path = "/branches/{branch}",
2746    tag = "branches",
2747    operation_id = "deleteBranch",
2748    params(
2749        ("branch" = String, Path, description = "Branch name to delete"),
2750    ),
2751    responses(
2752        (status = 200, description = "Branch deleted", body = BranchDeleteOutput),
2753        (status = 401, description = "Unauthorized", body = ErrorOutput),
2754        (status = 403, description = "Forbidden", body = ErrorOutput),
2755        (status = 404, description = "Branch not found", body = ErrorOutput),
2756        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2757    ),
2758    security(("bearer_token" = [])),
2759)]
2760/// Delete a branch.
2761///
2762/// **Irreversible.** Removes the branch pointer; commits remain reachable
2763/// only if referenced by another branch. Returns 404 if the branch does not
2764/// exist.
2765async fn server_branch_delete(
2766    State(state): State<AppState>,
2767    Extension(handle): Extension<Arc<GraphHandle>>,
2768    actor: Option<Extension<ResolvedActor>>,
2769    Path(BranchPath { branch }): Path<BranchPath>,
2770) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
2771    let actor_arc = actor
2772        .as_ref()
2773        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2774        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2775    let actor_id = actor
2776        .as_ref()
2777        .map(|Extension(actor)| actor.actor_id.as_ref());
2778    authorize_request(
2779        actor.as_ref().map(|Extension(actor)| actor),
2780        handle.policy.as_deref(),
2781        PolicyRequest {
2782            action: PolicyAction::BranchDelete,
2783            branch: None,
2784            target_branch: Some(branch.clone()),
2785        },
2786    )?;
2787    // Metadata-only manifest tombstone — small constant estimate.
2788    let _admission = state
2789        .workload
2790        .try_admit(&actor_arc, 256)
2791        .map_err(ApiError::from_workload_reject)?;
2792    {
2793        let db = &handle.engine;
2794        db.branch_delete_as(&branch, actor_id)
2795            .await
2796            .map_err(ApiError::from_omni)?;
2797    }
2798    Ok(Json(BranchDeleteOutput {
2799        uri: handle.uri.clone(),
2800        name: branch,
2801        actor_id: actor_id.map(str::to_string),
2802    }))
2803}
2804
2805#[utoipa::path(
2806    post,
2807    path = "/branches/merge",
2808    tag = "branches",
2809    operation_id = "mergeBranches",
2810    request_body = BranchMergeRequest,
2811    responses(
2812        (status = 200, description = "Branches merged", body = BranchMergeOutput),
2813        (status = 400, description = "Bad request", body = ErrorOutput),
2814        (status = 401, description = "Unauthorized", body = ErrorOutput),
2815        (status = 403, description = "Forbidden", body = ErrorOutput),
2816        (status = 409, description = "Merge conflict", body = ErrorOutput),
2817        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2818    ),
2819    security(("bearer_token" = [])),
2820)]
2821/// Merge one branch into another.
2822///
2823/// Merges `source` into `target` (defaults to `main`). Outcome is one of
2824/// `already_up_to_date`, `fast_forward`, or `merged`. Returns 409 with the
2825/// list of conflicts if the merge cannot be completed; the target is left
2826/// unchanged in that case. **Destructive** to `target` on success.
2827async fn server_branch_merge(
2828    State(state): State<AppState>,
2829    Extension(handle): Extension<Arc<GraphHandle>>,
2830    actor: Option<Extension<ResolvedActor>>,
2831    Json(request): Json<BranchMergeRequest>,
2832) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
2833    let target = request.target.unwrap_or_else(|| "main".to_string());
2834    let actor_arc = actor
2835        .as_ref()
2836        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2837        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2838    let actor_id = actor
2839        .as_ref()
2840        .map(|Extension(actor)| actor.actor_id.as_ref());
2841    authorize_request(
2842        actor.as_ref().map(|Extension(actor)| actor),
2843        handle.policy.as_deref(),
2844        PolicyRequest {
2845            action: PolicyAction::BranchMerge,
2846            branch: Some(request.source.clone()),
2847            target_branch: Some(target.clone()),
2848        },
2849    )?;
2850    // Merge body is small JSON; the heavy work is in the engine but is
2851    // bounded per-(table, branch) by the writer queue. Small constant
2852    // estimate suffices for the actor in-flight count.
2853    let _admission = state
2854        .workload
2855        .try_admit(&actor_arc, 256)
2856        .map_err(ApiError::from_workload_reject)?;
2857    let outcome = {
2858        let db = &handle.engine;
2859        db.branch_merge_as(&request.source, &target, actor_id)
2860            .await
2861            .map_err(ApiError::from_omni)?
2862    };
2863    Ok(Json(BranchMergeOutput {
2864        source: request.source,
2865        target,
2866        outcome: outcome.into(),
2867        actor_id: actor_id.map(str::to_string),
2868    }))
2869}
2870
2871#[utoipa::path(
2872    get,
2873    path = "/commits",
2874    tag = "commits",
2875    operation_id = "listCommits",
2876    params(CommitListQuery),
2877    responses(
2878        (status = 200, description = "List of commits", body = CommitListOutput),
2879        (status = 401, description = "Unauthorized", body = ErrorOutput),
2880        (status = 403, description = "Forbidden", body = ErrorOutput),
2881    ),
2882    security(("bearer_token" = [])),
2883)]
2884/// List commits.
2885///
2886/// Filter by `branch` to get the commits on a single branch (most recent
2887/// first); omit to list across all branches. Read-only.
2888async fn server_commit_list(
2889    Extension(handle): Extension<Arc<GraphHandle>>,
2890    actor: Option<Extension<ResolvedActor>>,
2891    Query(query): Query<CommitListQuery>,
2892) -> std::result::Result<Json<CommitListOutput>, ApiError> {
2893    authorize_request(
2894        actor.as_ref().map(|Extension(actor)| actor),
2895        handle.policy.as_deref(),
2896        PolicyRequest {
2897            action: PolicyAction::Read,
2898            branch: query.branch.clone(),
2899            target_branch: None,
2900        },
2901    )?;
2902    let commits = {
2903        let db = &handle.engine;
2904        db.list_commits(query.branch.as_deref())
2905            .await
2906            .map_err(ApiError::from_omni)?
2907    };
2908    Ok(Json(CommitListOutput {
2909        commits: commits.iter().map(api::commit_output).collect(),
2910    }))
2911}
2912
2913/// Path-param shape for [`server_commit_show`]. See [`BranchPath`]
2914/// for the design rationale — same pattern, different field name.
2915#[derive(Deserialize)]
2916struct CommitPath {
2917    commit_id: String,
2918}
2919
2920#[utoipa::path(
2921    get,
2922    path = "/commits/{commit_id}",
2923    tag = "commits",
2924    operation_id = "getCommit",
2925    params(
2926        ("commit_id" = String, Path, description = "Commit identifier"),
2927    ),
2928    responses(
2929        (status = 200, description = "Commit details", body = api::CommitOutput),
2930        (status = 401, description = "Unauthorized", body = ErrorOutput),
2931        (status = 403, description = "Forbidden", body = ErrorOutput),
2932        (status = 404, description = "Commit not found", body = ErrorOutput),
2933    ),
2934    security(("bearer_token" = [])),
2935)]
2936
2937/// Get a single commit.
2938///
2939/// Returns the commit's manifest version, parent commit(s), and creation
2940/// metadata. Read-only.
2941async fn server_commit_show(
2942    Extension(handle): Extension<Arc<GraphHandle>>,
2943    actor: Option<Extension<ResolvedActor>>,
2944    Path(CommitPath { commit_id }): Path<CommitPath>,
2945) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
2946    authorize_request(
2947        actor.as_ref().map(|Extension(actor)| actor),
2948        handle.policy.as_deref(),
2949        PolicyRequest {
2950            action: PolicyAction::Read,
2951            branch: None,
2952            target_branch: None,
2953        },
2954    )?;
2955    let commit = {
2956        let db = &handle.engine;
2957        db.get_commit(&commit_id)
2958            .await
2959            .map_err(ApiError::from_omni)?
2960    };
2961    Ok(Json(api::commit_output(&commit)))
2962}
2963
2964fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
2965    if let Some(snapshot) = snapshot {
2966        ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
2967    } else {
2968        ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
2969    }
2970}
2971
2972fn select_named_query_decl(
2973    query_source: &str,
2974    requested_name: Option<&str>,
2975) -> Result<omnigraph_compiler::query::ast::QueryDecl> {
2976    let parsed = parse_query(query_source)?;
2977    let query = if let Some(name) = requested_name {
2978        parsed
2979            .queries
2980            .into_iter()
2981            .find(|query| query.name == name)
2982            .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
2983    } else if parsed.queries.len() == 1 {
2984        parsed.queries.into_iter().next().unwrap()
2985    } else {
2986        bail!("query file contains multiple queries; pass --name");
2987    };
2988    Ok(query)
2989}
2990
2991fn select_named_query(
2992    query_source: &str,
2993    requested_name: Option<&str>,
2994) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
2995    let query = select_named_query_decl(query_source, requested_name)?;
2996    Ok((query.name, query.params))
2997}
2998
2999fn query_params_from_json(
3000    query_params: &[omnigraph_compiler::query::ast::Param],
3001    params_json: Option<&Value>,
3002) -> Result<ParamMap> {
3003    json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
3004        .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
3005}
3006
3007fn normalize_bearer_token(value: Option<String>) -> Option<String> {
3008    value
3009        .map(|value| value.trim().to_string())
3010        .filter(|value| !value.is_empty())
3011}
3012
3013fn normalize_bearer_actor(value: String) -> Result<String> {
3014    let value = value.trim().to_string();
3015    if value.is_empty() {
3016        bail!("bearer token actor names must not be blank");
3017    }
3018    Ok(value)
3019}
3020
3021fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
3022    let entries: HashMap<String, String> = serde_json::from_str(value)
3023        .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
3024    Ok(entries.into_iter().collect())
3025}
3026
3027fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
3028    let contents = fs::read_to_string(path)
3029        .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
3030    parse_bearer_tokens_json(&contents)
3031        .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
3032}
3033
3034fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
3035    let mut seen_actors = HashSet::new();
3036    let mut seen_tokens = HashSet::new();
3037    let mut normalized = Vec::with_capacity(entries.len());
3038
3039    for (actor, token) in entries {
3040        let actor = normalize_bearer_actor(actor)?;
3041        let Some(token) = normalize_bearer_token(Some(token)) else {
3042            bail!("bearer token for actor '{actor}' must not be blank");
3043        };
3044        if !seen_actors.insert(actor.clone()) {
3045            bail!("duplicate bearer token actor '{actor}'");
3046        }
3047        if !seen_tokens.insert(token.clone()) {
3048            bail!("duplicate bearer token value configured");
3049        }
3050        normalized.push((actor, token));
3051    }
3052
3053    normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
3054    Ok(normalized)
3055}
3056
3057fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
3058    let mut entries = Vec::new();
3059
3060    if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
3061    {
3062        entries.push(("default".to_string(), token));
3063    }
3064
3065    if let Some(path) =
3066        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
3067    {
3068        entries.extend(read_bearer_tokens_file(&path)?);
3069    } else if let Some(json) =
3070        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
3071    {
3072        entries.extend(parse_bearer_tokens_json(&json)?);
3073    }
3074
3075    validate_bearer_tokens(entries)
3076}
3077
3078#[cfg(test)]
3079mod tests {
3080    use super::{
3081        GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
3082        classify_server_runtime_state, hash_bearer_token, load_server_settings,
3083        normalize_bearer_token, parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
3084    };
3085    use serial_test::serial;
3086    use std::env;
3087    use std::fs;
3088    use tempfile::tempdir;
3089
3090    /// `authorize` returns the allow/deny **decision** (`Authz`) and reserves
3091    /// `Err` for operational failures, so the invoke handler can hide a denial
3092    /// as 404 without also masking a 401/500. Pins each outcome.
3093    #[test]
3094    fn authorize_splits_decision_from_operational_error() {
3095        use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
3096        use std::sync::Arc;
3097
3098        fn req(action: PolicyAction) -> PolicyRequest {
3099            PolicyRequest { action, branch: None, target_branch: None }
3100        }
3101        let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
3102
3103        // --- No policy engine installed (open / default-deny modes) ---
3104        // A server-scoped action is denied in every no-policy state.
3105        assert!(matches!(
3106            authorize(Some(&actor), None, req(PolicyAction::GraphList)).unwrap(),
3107            Authz::Denied(_)
3108        ));
3109        // Authenticated actor + a non-read per-graph action → default-deny.
3110        assert!(matches!(
3111            authorize(Some(&actor), None, req(PolicyAction::Change)).unwrap(),
3112            Authz::Denied(_)
3113        ));
3114        // `read` is the one per-graph action permitted without a policy.
3115        assert!(matches!(
3116            authorize(Some(&actor), None, req(PolicyAction::Read)).unwrap(),
3117            Authz::Allowed
3118        ));
3119        // Open mode (no actor, no policy) → allowed.
3120        assert!(matches!(
3121            authorize(None, None, req(PolicyAction::Read)).unwrap(),
3122            Authz::Allowed
3123        ));
3124
3125        // --- Policy engine installed ---
3126        let policy: PolicyConfig = serde_yaml::from_str(
3127            "version: 1\n\
3128             groups:\n  team: [act-alice]\n\
3129             rules:\n  - id: team-read\n    allow:\n      actors: { group: team }\n      actions: [read]\n      branch_scope: any\n",
3130        )
3131        .unwrap();
3132        let engine = PolicyCompiler::compile(&policy, "graph").unwrap();
3133
3134        // A matched allow rule → Allowed.
3135        assert!(matches!(
3136            authorize(
3137                Some(&actor),
3138                Some(&engine),
3139                PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
3140            )
3141            .unwrap(),
3142            Authz::Allowed
3143        ));
3144        // Known actor, no matching allow rule → Denied, carrying the decision message.
3145        match authorize(
3146            Some(&actor),
3147            Some(&engine),
3148            PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
3149        )
3150        .unwrap()
3151        {
3152            Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
3153            Authz::Allowed => panic!("change must be denied: only read is allowed"),
3154        }
3155        // Policy installed but no actor → operational failure (`Err`), NOT a
3156        // decision. This is the split that keeps a 401/500 from being masked
3157        // as the denial's response in the invoke handler.
3158        assert!(
3159            authorize(None, Some(&engine), req(PolicyAction::Read)).is_err(),
3160            "a missing actor with a policy installed is an operational error, not a deny"
3161        );
3162    }
3163
3164    #[test]
3165    fn hash_bearer_token_produces_32_byte_output() {
3166        let hash = hash_bearer_token("any-token");
3167        assert_eq!(hash.len(), 32);
3168    }
3169
3170    /// The single gate both open paths funnel through: it refuses a
3171    /// schema breakage (naming the graph label + query), attaches a clean
3172    /// registry, and collapses an empty one to `None`. Pure over its args
3173    /// (no engine), so it covers the multi-graph path's logic too — the
3174    /// only per-path difference is the `label`, asserted here.
3175    #[test]
3176    fn validate_and_attach_gates_on_schema_and_collapses_empty() {
3177        use crate::queries::{QueryRegistry, RegistrySpec};
3178        use omnigraph_compiler::catalog::build_catalog;
3179        use omnigraph_compiler::schema::parser::parse_schema;
3180
3181        let schema = parse_schema("node User {\nname: String\n}\n").unwrap();
3182        let catalog = build_catalog(&schema).unwrap();
3183        let spec = |name: &str, source: &str| RegistrySpec {
3184            name: name.to_string(),
3185            source: source.to_string(),
3186            expose: false,
3187            tool_name: None,
3188        };
3189
3190        // Empty registry → nothing attached, no error.
3191        let empty =
3192            super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
3193        assert!(empty.is_none());
3194
3195        // A query that type-checks → attached.
3196        let ok = QueryRegistry::from_specs(vec![spec(
3197            "find_user",
3198            "query find_user() { match { $u: User } return { $u.name } }",
3199        )])
3200        .unwrap();
3201        assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
3202
3203        // A query referencing a type the schema lacks → boot refusal that
3204        // names both the graph label and the offending query.
3205        let broken = QueryRegistry::from_specs(vec![spec(
3206            "ghost",
3207            "query ghost() { match { $w: Widget } return { $w.name } }",
3208        )])
3209        .unwrap();
3210        let err = super::validate_and_attach(broken, &catalog, "graph-x").unwrap_err();
3211        let msg = err.to_string();
3212        assert!(msg.contains("graph-x"), "labels the graph: {msg}");
3213        assert!(msg.contains("ghost"), "names the query: {msg}");
3214        assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
3215    }
3216
3217    #[test]
3218    fn hash_bearer_token_is_deterministic() {
3219        assert_eq!(
3220            hash_bearer_token("stable-input"),
3221            hash_bearer_token("stable-input"),
3222        );
3223    }
3224
3225    #[test]
3226    fn hash_bearer_token_differs_for_different_inputs() {
3227        assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
3228    }
3229
3230    #[test]
3231    fn hash_bearer_token_matches_known_sha256_vector() {
3232        // SHA-256("abc"). If this ever fails, the hash function was swapped.
3233        let hash = hash_bearer_token("abc");
3234        let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
3235        assert_eq!(
3236            hex,
3237            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
3238        );
3239    }
3240
3241    #[test]
3242    fn server_settings_load_from_yaml_config() {
3243        let temp = tempdir().unwrap();
3244        let config = temp.path().join("omnigraph.yaml");
3245        fs::write(
3246            &config,
3247            r#"
3248graphs:
3249  local:
3250    uri: /tmp/demo.omni
3251server:
3252  graph: local
3253  bind: 0.0.0.0:9090
3254"#,
3255        )
3256        .unwrap();
3257
3258        let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
3259        match &settings.mode {
3260            ServerConfigMode::Single { uri, graph_id, .. } => {
3261                assert_eq!(uri, "/tmp/demo.omni");
3262                assert_eq!(graph_id, "local");
3263            }
3264            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3265        }
3266        assert_eq!(settings.bind, "0.0.0.0:9090");
3267    }
3268
3269    #[test]
3270    fn server_settings_cli_flags_override_yaml_config() {
3271        let temp = tempdir().unwrap();
3272        let config = temp.path().join("omnigraph.yaml");
3273        fs::write(
3274            &config,
3275            r#"
3276graphs:
3277  local:
3278    uri: /tmp/demo.omni
3279server:
3280  graph: local
3281  bind: 127.0.0.1:8080
3282"#,
3283        )
3284        .unwrap();
3285
3286        let settings = load_server_settings(
3287            Some(&config),
3288            Some("/tmp/override.omni".to_string()),
3289            None,
3290            Some("0.0.0.0:9999".to_string()),
3291            false,
3292        )
3293        .unwrap();
3294        match &settings.mode {
3295            ServerConfigMode::Single { uri, graph_id, .. } => {
3296                assert_eq!(uri, "/tmp/override.omni");
3297                assert_eq!(graph_id, "/tmp/override.omni");
3298            }
3299            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3300        }
3301        assert_eq!(settings.bind, "0.0.0.0:9999");
3302    }
3303
3304    #[test]
3305    fn server_settings_can_resolve_named_target() {
3306        let temp = tempdir().unwrap();
3307        let config = temp.path().join("omnigraph.yaml");
3308        fs::write(
3309            &config,
3310            r#"
3311graphs:
3312  local:
3313    uri: ./demo.omni
3314  dev:
3315    uri: http://127.0.0.1:8080
3316server:
3317  graph: local
3318  bind: 127.0.0.1:8080
3319"#,
3320        )
3321        .unwrap();
3322
3323        let settings =
3324            load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
3325                .unwrap();
3326        match &settings.mode {
3327            ServerConfigMode::Single { uri, graph_id, .. } => {
3328                assert_eq!(uri, "http://127.0.0.1:8080");
3329                assert_eq!(graph_id, "dev");
3330            }
3331            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3332        }
3333    }
3334
3335    #[test]
3336    fn server_settings_require_uri_from_cli_or_config() {
3337        let error = load_server_settings(None, None, None, None, false).unwrap_err();
3338        assert!(
3339            error.to_string().contains("no graph to serve"),
3340            "expected mode-inference error, got: {error}",
3341        );
3342    }
3343
3344    #[test]
3345    fn classify_open_requires_explicit_unauthenticated_flag() {
3346        // State 1: no tokens, no policy, no flag → refuse to start.
3347        let error = classify_server_runtime_state(false, false, false).unwrap_err();
3348        let msg = error.to_string();
3349        assert!(
3350            msg.contains("--unauthenticated"),
3351            "expected refusal message mentioning --unauthenticated, got: {msg}"
3352        );
3353
3354        // Same matrix cell but with the flag set → Open mode permitted.
3355        assert_eq!(
3356            classify_server_runtime_state(false, false, true).unwrap(),
3357            ServerRuntimeState::Open
3358        );
3359    }
3360
3361    #[test]
3362    fn classify_tokens_without_policy_is_default_deny() {
3363        // State 2: tokens configured, no policy → DefaultDeny regardless
3364        // of the flag (the flag opts into the fully-open dev mode; it
3365        // doesn't downgrade default-deny back to open).
3366        assert_eq!(
3367            classify_server_runtime_state(true, false, false).unwrap(),
3368            ServerRuntimeState::DefaultDeny
3369        );
3370        assert_eq!(
3371            classify_server_runtime_state(true, false, true).unwrap(),
3372            ServerRuntimeState::DefaultDeny
3373        );
3374    }
3375
3376    #[tokio::test]
3377    #[serial]
3378    async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
3379        // Bug 2 from the bot-review pass: multi-mode startup was missing
3380        // the "policy requires tokens" check that single-mode enforces.
3381        // After centralizing the check in `classify_server_runtime_state`,
3382        // both modes get the same enforcement. This test guards the
3383        // multi-mode propagation path.
3384        //
3385        // Sibling test below pins single mode. Together they pin that
3386        // the classifier is called from both branches of `serve()`.
3387        let _guard = EnvGuard::set(&[
3388            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
3389            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
3390            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3391            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
3392            ("OMNIGRAPH_UNAUTHENTICATED", None),
3393        ]);
3394        let temp = tempdir().unwrap();
3395        // The classifier reads `has_policy_configured` from the config
3396        // shape (does the Option contain a path?), not from file
3397        // existence, so we can hand it a path without writing a real
3398        // policy file — the bail fires before policy load.
3399        let policy_path = temp.path().join("server-policy.yaml");
3400        let config = ServerConfig {
3401            mode: ServerConfigMode::Multi {
3402                graphs: vec![GraphStartupConfig {
3403                    graph_id: "alpha".to_string(),
3404                    uri: temp
3405                        .path()
3406                        .join("alpha.omni")
3407                        .to_string_lossy()
3408                        .into_owned(),
3409                    policy_file: None,
3410                    queries: crate::queries::QueryRegistry::default(),
3411                }],
3412                config_path: temp.path().join("omnigraph.yaml"),
3413                server_policy_file: Some(policy_path),
3414            },
3415            bind: "127.0.0.1:0".to_string(),
3416            allow_unauthenticated: false,
3417        };
3418        let result = serve(config).await;
3419        let err = result
3420            .expect_err("serve should refuse to start in multi mode with policy but no tokens");
3421        let msg = format!("{:?}", err);
3422        assert!(
3423            msg.contains("policy file is configured but no bearer tokens"),
3424            "expected policy-without-tokens rejection in multi mode, got: {msg}",
3425        );
3426    }
3427
3428    #[tokio::test]
3429    #[serial]
3430    async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
3431        // MR-723 PR A: pin the integration boundary that the classifier
3432        // is actually called by `serve()` before any side-effecting
3433        // work (Lance dataset open, TcpListener::bind). The classifier
3434        // itself is unit-tested above; this test guards the propagation
3435        // path from `classify_server_runtime_state` through serve's
3436        // `?` so a future refactor that drops the call returns red.
3437        //
3438        // Marked `#[serial]` because we have to clear all bearer-token
3439        // env vars, and another test in this module setting any of them
3440        // concurrently would corrupt the read inside `resolve_token_source`.
3441        let _guard = EnvGuard::set(&[
3442            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
3443            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
3444            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3445            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
3446            ("OMNIGRAPH_UNAUTHENTICATED", None),
3447        ]);
3448        let temp = tempdir().unwrap();
3449        // Graph path doesn't need to exist — classifier fires before
3450        // `AppState::open_with_bearer_tokens_and_policy`.
3451        let config = ServerConfig {
3452            mode: ServerConfigMode::Single {
3453                uri: temp
3454                    .path()
3455                    .join("graph.omni")
3456                    .to_string_lossy()
3457                    .into_owned(),
3458                graph_id: "default".to_string(),
3459                policy_file: None,
3460                queries: crate::queries::QueryRegistry::default(),
3461            },
3462            bind: "127.0.0.1:0".to_string(),
3463            allow_unauthenticated: false,
3464        };
3465        let result = serve(config).await;
3466        let err =
3467            result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
3468        let msg = format!("{:?}", err);
3469        assert!(
3470            msg.contains("no bearer tokens") || msg.contains("policy file"),
3471            "expected refusal message naming the misconfiguration, got: {msg}",
3472        );
3473    }
3474
3475    #[test]
3476    #[serial]
3477    fn unauthenticated_env_var_classification() {
3478        // MR-723 PR A: closes the gap where the env-var read path inside
3479        // `load_server_settings` was structurally implemented but not
3480        // exercised by any test. Three properties to pin, all in one
3481        // sequential test because `cargo test` runs the mod test suite
3482        // in parallel and `OMNIGRAPH_UNAUTHENTICATED` is process-global
3483        // — interleaving with another test that sets the same env var
3484        // (concurrent classifier tests, even the bearer-token suite
3485        // sharing `EnvGuard`) corrupts the read. Sequential within one
3486        // test fn is the simplest race-free shape.
3487        let temp = tempdir().unwrap();
3488        let config_path = temp.path().join("omnigraph.yaml");
3489        fs::write(
3490            &config_path,
3491            r#"
3492graphs:
3493  local:
3494    uri: /tmp/demo-unauth.omni
3495server:
3496  graph: local
3497"#,
3498        )
3499        .unwrap();
3500
3501        // Truthy values flip Open mode on, even with CLI flag off.
3502        for value in ["1", "true", "yes", "TRUE", "anything"] {
3503            let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
3504            let settings = load_server_settings(Some(&config_path), None, None, None, false)
3505                .expect("settings load should succeed");
3506            assert!(
3507                settings.allow_unauthenticated,
3508                "OMNIGRAPH_UNAUTHENTICATED={value:?} should enable Open mode",
3509            );
3510        }
3511
3512        // Falsy values keep refusal behavior, even with CLI flag off.
3513        for value in ["0", "false", "FALSE", ""] {
3514            let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
3515            let settings = load_server_settings(Some(&config_path), None, None, None, false)
3516                .expect("settings load should succeed");
3517            assert!(
3518                !settings.allow_unauthenticated,
3519                "OMNIGRAPH_UNAUTHENTICATED={value:?} should NOT enable Open mode",
3520            );
3521        }
3522
3523        // Unset env var: also false.
3524        let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
3525        let settings = load_server_settings(Some(&config_path), None, None, None, false)
3526            .expect("settings load should succeed");
3527        assert!(
3528            !settings.allow_unauthenticated,
3529            "OMNIGRAPH_UNAUTHENTICATED unset should NOT enable Open mode",
3530        );
3531        drop(_guard);
3532
3533        // CLI flag wins even when env is falsy — `serve()` honors the
3534        // OR of both inputs.
3535        let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
3536        let settings = load_server_settings(Some(&config_path), None, None, None, true)
3537            .expect("settings load should succeed");
3538        assert!(
3539            settings.allow_unauthenticated,
3540            "--unauthenticated CLI flag should win even when env is falsy",
3541        );
3542    }
3543
3544    #[test]
3545    fn classify_policy_enabled_requires_tokens() {
3546        // State 3: tokens + policy → PolicyEnabled, regardless of the
3547        // `allow_unauthenticated` flag (Cedar evaluates the bearer,
3548        // the flag is moot once tokens exist).
3549        assert_eq!(
3550            classify_server_runtime_state(true, true, false).unwrap(),
3551            ServerRuntimeState::PolicyEnabled
3552        );
3553        assert_eq!(
3554            classify_server_runtime_state(true, true, true).unwrap(),
3555            ServerRuntimeState::PolicyEnabled
3556        );
3557    }
3558
3559    #[test]
3560    fn classify_policy_without_tokens_is_rejected() {
3561        // Closes the "policy installed but no tokens → silent 401 on
3562        // every request" footgun. The same shape that single-mode
3563        // `open_with_bearer_tokens_and_policy` used to bail on
3564        // privately is now rejected by the classifier so both single
3565        // and multi mode get the same enforcement from one source of
3566        // truth.
3567        for allow_unauthenticated in [false, true] {
3568            let err =
3569                classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
3570            let msg = err.to_string();
3571            assert!(
3572                msg.contains("policy file is configured but no bearer tokens"),
3573                "expected policy-without-tokens rejection message; got: {msg}"
3574            );
3575            assert!(
3576                msg.contains("every request would 401"),
3577                "rejection message must name the failure mode; got: {msg}"
3578            );
3579        }
3580    }
3581
3582    #[test]
3583    fn normalize_bearer_token_trims_and_filters_blank_values() {
3584        assert_eq!(normalize_bearer_token(None), None);
3585        assert_eq!(normalize_bearer_token(Some("   ".to_string())), None);
3586        assert_eq!(
3587            normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
3588            Some("demo-token")
3589        );
3590    }
3591
3592    struct EnvGuard {
3593        saved: Vec<(&'static str, Option<String>)>,
3594    }
3595
3596    impl EnvGuard {
3597        fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
3598            let saved = vars
3599                .iter()
3600                .map(|(name, _)| (*name, env::var(name).ok()))
3601                .collect::<Vec<_>>();
3602            for (name, value) in vars {
3603                unsafe {
3604                    match value {
3605                        Some(value) => env::set_var(name, value),
3606                        None => env::remove_var(name),
3607                    }
3608                }
3609            }
3610            Self { saved }
3611        }
3612    }
3613
3614    impl Drop for EnvGuard {
3615        fn drop(&mut self) {
3616            for (name, value) in self.saved.drain(..) {
3617                unsafe {
3618                    match value {
3619                        Some(value) => env::set_var(name, value),
3620                        None => env::remove_var(name),
3621                    }
3622                }
3623            }
3624        }
3625    }
3626
3627    #[test]
3628    fn parse_bearer_tokens_json_reads_actor_token_map() {
3629        let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
3630        assert_eq!(tokens.len(), 2);
3631        assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
3632        assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
3633    }
3634
3635    #[test]
3636    #[serial]
3637    fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
3638        let temp = tempdir().unwrap();
3639        let tokens_path = temp.path().join("tokens.json");
3640        fs::write(
3641            &tokens_path,
3642            r#"{"team-01":"token-one","team-02":"token-two"}"#,
3643        )
3644        .unwrap();
3645
3646        let _guard = EnvGuard::set(&[
3647            ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
3648            (
3649                "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
3650                Some(tokens_path.to_str().unwrap()),
3651            ),
3652            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3653        ]);
3654
3655        let tokens = server_bearer_tokens_from_env().unwrap();
3656        assert_eq!(
3657            tokens,
3658            vec![
3659                ("default".to_string(), "legacy-token".to_string()),
3660                ("team-01".to_string(), "token-one".to_string()),
3661                ("team-02".to_string(), "token-two".to_string()),
3662            ]
3663        );
3664    }
3665}