Skip to main content

omnigraph_server/
lib.rs

1pub mod api;
2pub mod auth;
3pub mod config;
4pub mod graph_id;
5pub mod identity;
6pub mod policy;
7pub mod registry;
8pub mod workload;
9
10pub use graph_id::GraphId;
11pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
12pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
13
14use std::collections::{HashMap, HashSet};
15use std::fs;
16use std::io;
17use std::io::Write;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use api::{
22    BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
23    BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
24    CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
25    HealthOutput, IngestOutput, IngestRequest, QueryRequest, ReadOutput, ReadRequest,
26    SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
27    schema_apply_output, snapshot_payload,
28};
29pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
30use axum::body::{Body, Bytes};
31use axum::extract::DefaultBodyLimit;
32use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
33use axum::http::StatusCode;
34use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
35use axum::middleware::{self, Next};
36use axum::response::{IntoResponse, Response};
37use axum::routing::{delete, get, post};
38use axum::{Json, Router};
39use color_eyre::eyre::{Result, WrapErr, bail};
40pub use config::{
41    AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
42    ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
43    load_config,
44};
45use futures::stream;
46use omnigraph::db::{Omnigraph, ReadTarget};
47use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
48use omnigraph::storage::normalize_root_uri;
49use omnigraph_compiler::json_params_to_param_map;
50use omnigraph_compiler::query::parser::parse_query;
51use omnigraph_compiler::{JsonParamMode, ParamMap};
52pub use policy::{
53    PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
54    PolicyRequest, PolicyResourceKind, PolicyTestConfig,
55};
56use serde::Deserialize;
57use serde_json::Value;
58use sha2::{Digest, Sha256};
59use subtle::ConstantTimeEq;
60use tokio::net::TcpListener;
61use tokio::sync::mpsc;
62use tower_http::trace::TraceLayer;
63use tracing::{error, info, warn};
64use tracing_subscriber::EnvFilter;
65use utoipa::OpenApi;
66use utoipa::openapi::path::{Parameter, ParameterIn};
67use utoipa::openapi::schema::{Object, Type};
68use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
69
70type BearerTokenHash = [u8; 32];
71
72fn hash_bearer_token(token: &str) -> BearerTokenHash {
73    let digest = Sha256::digest(token.as_bytes());
74    let mut out = [0u8; 32];
75    out.copy_from_slice(&digest);
76    out
77}
78
79#[derive(OpenApi)]
80#[openapi(
81    info(
82        title = "Omnigraph API",
83        description = "HTTP API for the Omnigraph graph database",
84    ),
85    paths(
86        server_health,
87        server_graphs_list,
88        server_snapshot,
89        // deprecated; the #[deprecated] attribute on the handler
90        // surfaces as `deprecated: true` on the OpenAPI operation.
91        #[allow(deprecated)] server_read,
92        server_query,
93        server_export,
94        #[allow(deprecated)] server_change,
95        server_mutate,
96        server_schema_apply,
97        server_schema_get,
98        server_ingest,
99        server_branch_list,
100        server_branch_create,
101        server_branch_delete,
102        server_branch_merge,
103        server_commit_list,
104        server_commit_show,
105    ),
106    modifiers(&SecurityAddon),
107)]
108pub struct ApiDoc;
109
110struct SecurityAddon;
111
112impl utoipa::Modify for SecurityAddon {
113    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
114        openapi
115            .components
116            .get_or_insert_with(Default::default)
117            .add_security_scheme(
118                "bearer_token",
119                SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
120            );
121    }
122}
123
124const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
125const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
126const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
127const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
128
129#[derive(Debug, Clone)]
130pub struct ServerConfig {
131    /// Server topology + the graphs to open at startup. Single-mode
132    /// invocations (`omnigraph-server <URI>` or `--target <name>`)
133    /// produce `ServerConfigMode::Single`; multi-mode invocations
134    /// (`--config omnigraph.yaml` with a non-empty `graphs:` map and
135    /// no single-mode selector) produce `ServerConfigMode::Multi`.
136    pub mode: ServerConfigMode,
137    pub bind: String,
138    /// Operator opt-in for fully-unauthenticated dev mode (MR-723).
139    /// When neither bearer tokens nor a policy file are configured,
140    /// `serve()` refuses to start unless this is true (set via
141    /// `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1`). The
142    /// motivation is that "no tokens + no policy" looks like protection
143    /// (no Cedar errors at boot) but is actually fully open — operators
144    /// who set up auth and forgot the policy file would otherwise ship
145    /// the illusion of protection.
146    pub allow_unauthenticated: bool,
147}
148
149/// What `load_server_settings` produces after applying the four-rule
150/// mode inference matrix (MR-668 decision 2).
151#[derive(Debug, Clone)]
152pub enum ServerConfigMode {
153    /// Legacy invocation — one graph at the given URI. Either:
154    ///   * `omnigraph-server <URI>` (CLI positional), or
155    ///   * `omnigraph-server --target <name> --config omnigraph.yaml`, or
156    ///   * `omnigraph-server --config omnigraph.yaml` with `server.graph`
157    ///     set to a named target.
158    Single {
159        uri: String,
160        /// Top-level `policy.file` (single-graph Cedar policy).
161        policy_file: Option<PathBuf>,
162    },
163    /// Multi-graph invocation — `--config omnigraph.yaml` with a
164    /// non-empty `graphs:` map and no single-mode selector.
165    Multi {
166        /// Per-graph startup configs, sorted by graph id (BTreeMap
167        /// iteration order). The parallel-open loop iterates this.
168        graphs: Vec<GraphStartupConfig>,
169        /// Path to the config file the server was started from. Kept on
170        /// the mode so future runtime mutation (deferred — see release
171        /// notes) can locate the source of truth without re-parsing CLI
172        /// args.
173        config_path: PathBuf,
174        /// `server.policy.file` (server-level Cedar policy for the
175        /// management endpoints). Wired into `GET /graphs` authorization.
176        server_policy_file: Option<PathBuf>,
177    },
178}
179
180/// One graph's startup-time configuration: id, opened URI, optional
181/// per-graph policy file path. Constructed by `load_server_settings`
182/// in multi mode; consumed by `serve`'s parallel open loop.
183#[derive(Debug, Clone)]
184pub struct GraphStartupConfig {
185    pub graph_id: String,
186    pub uri: String,
187    pub policy_file: Option<PathBuf>,
188}
189
190/// Runtime routing for the server. Single mode = legacy
191/// `omnigraph-server <URI>` invocation, one graph, flat HTTP routes.
192/// Multi mode = `--config omnigraph.yaml` with a non-empty `graphs:`
193/// map, N graphs, cluster routes (`/graphs/{graph_id}/...`). Mode is
194/// determined at startup by `load_server_settings`.
195///
196/// In single mode the handle lives here directly — there is no
197/// registry, no sentinel key, no walk-and-assert. In multi mode the
198/// registry carries N handles and the middleware dispatches on the
199/// URL's `{graph_id}` segment.
200///
201/// Both modes share the same handler bodies — the routing middleware
202/// (`resolve_graph_handle`) injects `Arc<GraphHandle>` as a request
203/// extension so handlers never see the routing discriminator.
204#[derive(Clone)]
205pub enum GraphRouting {
206    /// Single-graph deployment: one handle, flat routes (`/snapshot`,
207    /// `/read`, …). The `handle.uri` field carries the URI the engine
208    /// was opened from. Backward compatible with v0.6.0 deployments.
209    Single { handle: Arc<GraphHandle> },
210    /// Multi-graph deployment: many handles, cluster routes
211    /// (`/graphs/{graph_id}/...`). `config_path` is the `omnigraph.yaml`
212    /// the server reads at startup; preserved here so future runtime
213    /// mutation (deferred) can find the source of truth without
214    /// re-parsing CLI args. The server treats the file as
215    /// operator-owned and never writes it.
216    Multi {
217        registry: Arc<GraphRegistry>,
218        config_path: Option<PathBuf>,
219    },
220}
221
222#[derive(Clone)]
223pub struct AppState {
224    /// Runtime routing — the single source of truth for where each
225    /// request's graph lives. Single mode holds the handle directly;
226    /// multi mode holds the registry + config path. Both arms are
227    /// the same shape from a handler's perspective: middleware
228    /// extracts an `Arc<GraphHandle>` and injects it as a request
229    /// extension.
230    routing: GraphRouting,
231    /// Per-actor admission control. Process-wide (not per-graph) —
232    /// see MR-668 decision Q6.
233    workload: Arc<workload::WorkloadController>,
234    bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
235    /// Server-level Cedar policy. Used by management endpoints (`POST
236    /// /graphs`, `GET /graphs`) which act on the registry resource,
237    /// not on a per-graph resource. Loaded from `server.policy.file`
238    /// in `omnigraph.yaml`. `None` outside multi mode and when no
239    /// server policy is configured. Per-graph policies live on each
240    /// `GraphHandle.policy`.
241    server_policy: Option<Arc<PolicyEngine>>,
242}
243
244struct ExportStreamWriter {
245    sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
246}
247
248impl Write for ExportStreamWriter {
249    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
250        self.sender
251            .send(Ok(Bytes::copy_from_slice(buf)))
252            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
253        Ok(buf.len())
254    }
255
256    fn flush(&mut self) -> io::Result<()> {
257        Ok(())
258    }
259}
260
261#[derive(Debug)]
262pub struct ApiError {
263    status: StatusCode,
264    code: ErrorCode,
265    message: String,
266    merge_conflicts: Vec<api::MergeConflictOutput>,
267    manifest_conflict: Option<api::ManifestConflictOutput>,
268}
269
270impl AppState {
271    /// Canonical single-mode constructor. Every other `new_*` / `open_*`
272    /// helper is a thin convenience wrapper around this one. Builds the
273    /// engine + per-graph policy through `build_single_mode`, which
274    /// applies `Omnigraph::with_policy` so HTTP-layer and engine-layer
275    /// policy can never diverge — there is no "policy installed on HTTP
276    /// but not on engine" representable state (closes the prior
277    /// `with_policy_engine` footgun that reused the engine `Arc`
278    /// without re-applying `with_policy`).
279    pub fn new_single(
280        uri: String,
281        db: Omnigraph,
282        bearer_tokens: Vec<(String, String)>,
283        policy_engine: Option<PolicyEngine>,
284        workload: workload::WorkloadController,
285    ) -> Self {
286        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
287        let per_graph_policy = policy_engine.map(Arc::new);
288        Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload))
289    }
290
291    pub fn new(uri: String, db: Omnigraph) -> Self {
292        Self::new_single(
293            uri,
294            db,
295            Vec::new(),
296            None,
297            workload::WorkloadController::from_env(),
298        )
299    }
300
301    pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
302        let bearer_tokens = normalize_bearer_token(bearer_token)
303            .into_iter()
304            .map(|token| ("default".to_string(), token))
305            .collect();
306        Self::new_with_bearer_tokens(uri, db, bearer_tokens)
307    }
308
309    pub fn new_with_bearer_tokens(
310        uri: String,
311        db: Omnigraph,
312        bearer_tokens: Vec<(String, String)>,
313    ) -> Self {
314        Self::new_single(
315            uri,
316            db,
317            bearer_tokens,
318            None,
319            workload::WorkloadController::from_env(),
320        )
321    }
322
323    pub fn new_with_bearer_tokens_and_policy(
324        uri: String,
325        db: Omnigraph,
326        bearer_tokens: Vec<(String, String)>,
327        policy_engine: Option<PolicyEngine>,
328    ) -> Self {
329        Self::new_single(
330            uri,
331            db,
332            bearer_tokens,
333            policy_engine,
334            workload::WorkloadController::from_env(),
335        )
336    }
337
338    /// Construct with a caller-provided [`workload::WorkloadController`].
339    /// Tests and benches use this to override per-actor caps without
340    /// mutating global env vars (unsafe in Rust 2024 once the async
341    /// runtime is up — `setenv` isn't thread-safe). For tests that also
342    /// need a custom `PolicyEngine`, use [`new_single`] directly.
343    pub fn new_with_workload(
344        uri: String,
345        db: Omnigraph,
346        bearer_tokens: Vec<(String, String)>,
347        workload: workload::WorkloadController,
348    ) -> Self {
349        Self::new_single(uri, db, bearer_tokens, None, workload)
350    }
351
352    pub async fn open(uri: impl Into<String>) -> Result<Self> {
353        Self::open_with_bearer_token(uri, None).await
354    }
355
356    pub async fn open_with_bearer_token(
357        uri: impl Into<String>,
358        bearer_token: Option<String>,
359    ) -> Result<Self> {
360        let bearer_tokens = normalize_bearer_token(bearer_token)
361            .into_iter()
362            .map(|token| ("default".to_string(), token))
363            .collect();
364        Self::open_with_bearer_tokens(uri, bearer_tokens).await
365    }
366
367    pub async fn open_with_bearer_tokens(
368        uri: impl Into<String>,
369        bearer_tokens: Vec<(String, String)>,
370    ) -> Result<Self> {
371        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
372        let db = Omnigraph::open(&uri).await?;
373        Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
374    }
375
376    pub async fn open_with_bearer_tokens_and_policy(
377        uri: impl Into<String>,
378        bearer_tokens: Vec<(String, String)>,
379        policy_file: Option<&PathBuf>,
380    ) -> Result<Self> {
381        // The "policy requires tokens" invariant is enforced once by
382        // `classify_server_runtime_state` in `serve()`, before either
383        // single-mode or multi-mode construction is reached. By the
384        // time we get here, the (policy, no-tokens) combination has
385        // already been rejected — no second bail needed.
386        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
387        let db = Omnigraph::open(&uri).await?;
388        let policy_engine = match policy_file {
389            Some(path) => Some(PolicyEngine::load_graph(path, &uri)?),
390            None => None,
391        };
392        Ok(Self::new_with_bearer_tokens_and_policy(
393            uri,
394            db,
395            bearer_tokens,
396            policy_engine,
397        ))
398    }
399
400    /// Single-mode shared construction: wraps the bare engine + per-graph
401    /// policy in a `GraphHandle` carried directly by `GraphRouting::Single`.
402    /// Per-graph policy enforcement on the engine (MR-722) is re-applied
403    /// via `Omnigraph::with_policy` so HTTP and engine layers can never
404    /// diverge.
405    fn build_single_mode(
406        uri: String,
407        db: Omnigraph,
408        bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
409        policy_engine: Option<Arc<PolicyEngine>>,
410        workload: Arc<workload::WorkloadController>,
411    ) -> Self {
412        // Engine-layer policy gate (MR-722). With a per-graph policy
413        // installed, every `_as` writer on `Omnigraph` calls into the
414        // PolicyChecker. HTTP-layer `authorize_request` is the first
415        // gate; engine-layer is the redundant-but-correct backstop.
416        let db = if let Some(policy) = policy_engine.as_ref() {
417            let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
418            db.with_policy(checker)
419        } else {
420            db
421        };
422        // `GraphHandle.key` is required by the struct, but in single
423        // mode it is never a registry key (there's no registry) and
424        // never compared against user input (routes are flat, no
425        // `{graph_id}` parameter). The label appears only in tracing
426        // output from `resolve_graph_handle`. The literal below is a
427        // log label, not a routing key — when the future cluster
428        // catalog ships, single mode may carry the catalog-assigned
429        // id here instead.
430        let uri = normalize_root_uri(&uri).unwrap_or(uri);
431        let key = GraphKey::cluster(
432            GraphId::try_from("default").expect("'default' is a valid GraphId log label"),
433        );
434        let handle = Arc::new(GraphHandle {
435            key,
436            uri,
437            engine: Arc::new(db),
438            policy: policy_engine,
439        });
440        Self {
441            routing: GraphRouting::Single { handle },
442            workload,
443            bearer_tokens,
444            server_policy: None,
445        }
446    }
447
448    /// Multi-mode constructor — used by the startup loop. Operators
449    /// reach this by invoking `omnigraph-server --config omnigraph.yaml`
450    /// with a non-empty `graphs:` map.
451    ///
452    /// Caller supplies the already-opened `GraphHandle`s and (optionally)
453    /// the path to the source config file. `server_policy` is loaded
454    /// from `server.policy.file` if configured.
455    pub fn new_multi(
456        handles: Vec<Arc<GraphHandle>>,
457        bearer_tokens: Vec<(String, String)>,
458        server_policy: Option<PolicyEngine>,
459        workload: workload::WorkloadController,
460        config_path: Option<PathBuf>,
461    ) -> std::result::Result<Self, InsertError> {
462        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
463        let registry = Arc::new(GraphRegistry::from_handles(handles)?);
464        Ok(Self {
465            routing: GraphRouting::Multi {
466                registry,
467                config_path,
468            },
469            workload: Arc::new(workload),
470            bearer_tokens,
471            server_policy: server_policy.map(Arc::new),
472        })
473    }
474
475    /// Runtime routing accessor. Handlers don't typically inspect this —
476    /// they extract `Arc<GraphHandle>` via the routing middleware — but
477    /// `build_app` matches on it to decide flat vs nested route
478    /// mounting, and a handful of management endpoints (`GET /graphs`,
479    /// the OpenAPI cluster rewrite) match on the discriminant.
480    pub fn routing(&self) -> &GraphRouting {
481        &self.routing
482    }
483
484    fn requires_bearer_auth(&self) -> bool {
485        if !self.bearer_tokens.is_empty() {
486            return true;
487        }
488        if self.server_policy.is_some() {
489            return true;
490        }
491        // Any per-graph policy also requires auth — otherwise the
492        // policy gate would receive unauthenticated requests. Reading
493        // from `routing` is O(1) in both arms: single mode is a direct
494        // `handle.policy.is_some()` check, multi mode reads the
495        // cached `any_per_graph_policy` flag on the registry snapshot.
496        match &self.routing {
497            GraphRouting::Single { handle } => handle.policy.is_some(),
498            GraphRouting::Multi { registry, .. } => registry.snapshot_ref().any_per_graph_policy,
499        }
500    }
501
502    fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
503        // Hash the incoming token and compare against every stored digest in
504        // constant time. Iterate all entries unconditionally so total work —
505        // and therefore response timing — doesn't depend on which slot matches.
506        let provided_hash = hash_bearer_token(provided_token);
507        let mut matched: Option<Arc<str>> = None;
508        for (hash, actor) in self.bearer_tokens.iter() {
509            if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
510                matched = Some(Arc::clone(actor));
511            }
512        }
513        matched.map(ResolvedActor::cluster_static)
514    }
515}
516
517fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
518    let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
519        .into_iter()
520        .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
521        .collect();
522    Arc::from(tokens)
523}
524
525impl ApiError {
526    pub fn unauthorized(message: impl Into<String>) -> Self {
527        Self {
528            status: StatusCode::UNAUTHORIZED,
529            code: ErrorCode::Unauthorized,
530            message: message.into(),
531            merge_conflicts: Vec::new(),
532            manifest_conflict: None,
533        }
534    }
535
536    pub fn forbidden(message: impl Into<String>) -> Self {
537        Self {
538            status: StatusCode::FORBIDDEN,
539            code: ErrorCode::Forbidden,
540            message: message.into(),
541            merge_conflicts: Vec::new(),
542            manifest_conflict: None,
543        }
544    }
545
546    pub fn bad_request(message: impl Into<String>) -> Self {
547        Self {
548            status: StatusCode::BAD_REQUEST,
549            code: ErrorCode::BadRequest,
550            message: message.into(),
551            merge_conflicts: Vec::new(),
552            manifest_conflict: None,
553        }
554    }
555
556    pub fn not_found(message: impl Into<String>) -> Self {
557        Self {
558            status: StatusCode::NOT_FOUND,
559            code: ErrorCode::NotFound,
560            message: message.into(),
561            merge_conflicts: Vec::new(),
562            manifest_conflict: None,
563        }
564    }
565
566    /// HTTP 405 Method Not Allowed. Used when the route is mounted but
567    /// the active server mode doesn't serve it (`GET /graphs` in
568    /// single-graph mode returns this instead of 404 so clients can
569    /// distinguish "wrong context" from "no such resource").
570    pub fn method_not_allowed(message: impl Into<String>) -> Self {
571        Self {
572            status: StatusCode::METHOD_NOT_ALLOWED,
573            code: ErrorCode::MethodNotAllowed,
574            message: message.into(),
575            merge_conflicts: Vec::new(),
576            manifest_conflict: None,
577        }
578    }
579
580    pub fn conflict(message: impl Into<String>) -> Self {
581        Self {
582            status: StatusCode::CONFLICT,
583            code: ErrorCode::Conflict,
584            message: message.into(),
585            merge_conflicts: Vec::new(),
586            manifest_conflict: None,
587        }
588    }
589
590    pub fn internal(message: impl Into<String>) -> Self {
591        Self {
592            status: StatusCode::INTERNAL_SERVER_ERROR,
593            code: ErrorCode::Internal,
594            message: message.into(),
595            merge_conflicts: Vec::new(),
596            manifest_conflict: None,
597        }
598    }
599
600    /// HTTP 429 Too Many Requests — actor exceeded their per-actor
601    /// admission cap (count or byte budget). Clients should respect the
602    /// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded`
603    /// and `RejectReason::ByteBudgetExceeded`.
604    pub fn too_many_requests(message: impl Into<String>) -> Self {
605        Self {
606            status: StatusCode::TOO_MANY_REQUESTS,
607            code: ErrorCode::TooManyRequests,
608            message: message.into(),
609            merge_conflicts: Vec::new(),
610            manifest_conflict: None,
611        }
612    }
613
614    /// Convert a `WorkloadController` rejection into the matching
615    /// `ApiError` variant.
616    pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
617        match reject {
618            workload::RejectReason::InFlightCountExceeded { .. }
619            | workload::RejectReason::ByteBudgetExceeded { .. } => {
620                Self::too_many_requests(reject.to_string())
621            }
622        }
623    }
624
625    fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
626        Self {
627            status: StatusCode::CONFLICT,
628            code: ErrorCode::Conflict,
629            message: summarize_merge_conflicts(&conflicts),
630            merge_conflicts: conflicts,
631            manifest_conflict: None,
632        }
633    }
634
635    fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
636        Self {
637            status: StatusCode::CONFLICT,
638            code: ErrorCode::Conflict,
639            message,
640            merge_conflicts: Vec::new(),
641            manifest_conflict: Some(details),
642        }
643    }
644
645    fn from_omni(err: OmniError) -> Self {
646        match err {
647            OmniError::Compiler(err) => Self::bad_request(err.to_string()),
648            OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
649            OmniError::Manifest(err) => match err.kind {
650                ManifestErrorKind::BadRequest => Self::bad_request(err.message),
651                ManifestErrorKind::NotFound => Self::not_found(err.message),
652                ManifestErrorKind::Conflict => match err.details {
653                    Some(ManifestConflictDetails::ExpectedVersionMismatch {
654                        table_key,
655                        expected,
656                        actual,
657                    }) => Self::manifest_version_conflict(
658                        err.message,
659                        api::ManifestConflictOutput {
660                            table_key,
661                            expected,
662                            actual,
663                        },
664                    ),
665                    _ => Self::conflict(err.message),
666                },
667                ManifestErrorKind::Internal => Self::internal(err.message),
668            },
669            OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
670                conflicts
671                    .iter()
672                    .map(api::MergeConflictOutput::from)
673                    .collect(),
674            ),
675            OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
676            OmniError::Io(err) => Self::internal(format!("io: {err}")),
677            // Engine-layer policy enforcement (MR-722). All denials and
678            // evaluation failures surface here as 403. The HTTP-layer
679            // `authorize_request` already distinguishes 401 (missing
680            // bearer) from 403 (policy denial), so by the time the
681            // engine gate fires, the bearer is valid — any failure from
682            // the engine is a policy outcome, not an auth one.
683            OmniError::Policy(message) => Self::forbidden(message),
684            // `Omnigraph::init` against an existing graph URI in strict
685            // mode. Not currently HTTP-reachable (POST /graphs was
686            // pulled), but mapping is wired so the variant has a
687            // single canonical translation when a future runtime
688            // create endpoint lands.
689            err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
690        }
691    }
692}
693
694fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
695    if conflicts.is_empty() {
696        return "merge conflicts".to_string();
697    }
698
699    let preview: Vec<String> = conflicts
700        .iter()
701        .take(3)
702        .map(|conflict| match conflict.row_id.as_deref() {
703            Some(row_id) => format!(
704                "{}:{} ({})",
705                conflict.table_key,
706                row_id,
707                conflict.kind.as_str()
708            ),
709            None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
710        })
711        .collect();
712
713    let suffix = if conflicts.len() > preview.len() {
714        format!("; and {} more", conflicts.len() - preview.len())
715    } else {
716        String::new()
717    };
718
719    format!("merge conflicts: {}{}", preview.join("; "), suffix)
720}
721
722/// Constant `Retry-After` value (seconds) emitted on 429 responses.
723const RETRY_AFTER_SECONDS: &str = "60";
724
725impl IntoResponse for ApiError {
726    fn into_response(self) -> Response {
727        let mut headers = axum::http::HeaderMap::new();
728        if matches!(self.code, ErrorCode::TooManyRequests) {
729            headers.insert(
730                axum::http::header::RETRY_AFTER,
731                axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
732            );
733        }
734        (
735            self.status,
736            headers,
737            Json(ErrorOutput {
738                error: self.message,
739                code: Some(self.code),
740                merge_conflicts: self.merge_conflicts,
741                manifest_conflict: self.manifest_conflict,
742            }),
743        )
744            .into_response()
745    }
746}
747
748pub fn init_tracing() {
749    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
750    let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
751}
752
753pub fn load_server_settings(
754    config_path: Option<&PathBuf>,
755    cli_uri: Option<String>,
756    cli_target: Option<String>,
757    cli_bind: Option<String>,
758    cli_allow_unauthenticated: bool,
759) -> Result<ServerConfig> {
760    let config = load_config(config_path)?;
761    let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
762    // Either `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1` flips
763    // this. Treat any non-empty, non-"0"/"false" string as truthy —
764    // standard 12-factor "any value is true" reading of the env var.
765    let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
766        .ok()
767        .map(|v| {
768            let trimmed = v.trim();
769            !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
770        })
771        .unwrap_or(false);
772    let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
773
774    // MR-668 decision 2 — four-rule mode inference matrix.
775    //
776    //   1. CLI `<URI>` positional        → Single (URI = the value)
777    //   2. CLI `--target <name>`         → Single (URI = graphs.<name>.uri)
778    //   3. `server.graph` in config      → Single (URI = graphs.<server.graph>.uri)
779    //   4. `--config` + non-empty `graphs:` + no single-mode selector
780    //                                    → Multi (every entry in `graphs:`)
781    //   5. otherwise                     → error with migration hint
782    //
783    // Rules 1-3 are mutually compatible (CLI URI wins over `--target`
784    // wins over `server.graph`), reusing the existing
785    // `resolve_target_uri` precedence.
786    let has_cli_uri = cli_uri.is_some();
787    let has_cli_target = cli_target.is_some();
788    let has_server_graph = config.server_graph_name().is_some();
789    let has_graphs_map = !config.graphs.is_empty();
790    let has_explicit_config = config_path.is_some();
791
792    let mode = if has_cli_uri || has_cli_target || has_server_graph {
793        // Rules 1, 2, or 3 → Single mode.
794        let raw_uri = config.resolve_target_uri(
795            cli_uri,
796            cli_target.as_deref(),
797            config.server_graph_name(),
798        )?;
799        let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
800            format!("normalize single-graph URI '{raw_uri}' from server settings")
801        })?;
802        let policy_file = config.resolve_policy_file();
803        ServerConfigMode::Single { uri, policy_file }
804    } else if has_explicit_config && has_graphs_map {
805        if config.resolve_policy_file().is_some() {
806            bail!(
807                "top-level `policy.file` is single-graph/CLI-local policy only; \
808                 in multi-graph mode move per-graph rules to \
809                 `graphs.<graph_id>.policy.file` and move `graph_list` rules to \
810                 `server.policy.file`."
811            );
812        }
813        // Rule 4 → Multi mode. Build a startup config per graph.
814        let mut graphs = Vec::with_capacity(config.graphs.len());
815        for (name, target) in &config.graphs {
816            // Validate the graph id can construct a `GraphId` newtype.
817            // Doing this here (not at registry insert) so a malformed
818            // omnigraph.yaml fails at startup with a clear error.
819            GraphId::try_from(name.clone()).map_err(|err| {
820                color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
821            })?;
822            let raw_uri = config.resolve_uri_value(&target.uri);
823            let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
824                format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
825            })?;
826            graphs.push(GraphStartupConfig {
827                graph_id: name.clone(),
828                uri,
829                policy_file: config.resolve_target_policy_file(name),
830            });
831        }
832        let config_path = config_path
833            .cloned()
834            .expect("has_explicit_config implies config_path is Some");
835        let server_policy_file = config.resolve_server_policy_file();
836        ServerConfigMode::Multi {
837            graphs,
838            config_path,
839            server_policy_file,
840        }
841    } else {
842        // Rule 5 → error with migration hint.
843        bail!(
844            "no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
845             (`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
846             omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
847             file referenced by `--config`."
848        );
849    };
850
851    Ok(ServerConfig {
852        mode,
853        bind,
854        allow_unauthenticated,
855    })
856}
857
858/// Whether the loaded config will run the server in multi-graph mode.
859/// Useful for the test that constructs `ServerConfig` directly.
860pub fn server_config_is_multi(config: &ServerConfig) -> bool {
861    matches!(config.mode, ServerConfigMode::Multi { .. })
862}
863
864/// MR-723 server runtime state, classified from the three-state matrix
865/// of (bearer tokens configured) × (policy file configured) at startup.
866///
867/// * **Open** — neither tokens nor policy; requires explicit
868///   `allow_unauthenticated`. Effectively a "trust the network" dev
869///   mode. `serve()` refuses to start in this shape without the flag,
870///   so the only way to reach this state at runtime is via deliberate
871///   operator opt-in.
872/// * **DefaultDeny** — tokens configured but no policy file. The
873///   server requires a valid bearer token; once authenticated, every
874///   action except `Read` is denied with 403. Closes the "tokens but
875///   forgot the policy file" trap.
876/// * **PolicyEnabled** — policy file configured and at least one
877///   bearer token configured. Cedar evaluates every authenticated
878///   request. Policy without tokens is rejected at startup —
879///   such a server would 401 every request, which is bug-shaped
880///   rather than feature-shaped (operators wanting "deny all
881///   unauthenticated traffic" should configure tokens plus a
882///   deny-all policy to get meaningful 403s with policy-decision
883///   logging instead).
884#[derive(Debug, Clone, Copy, Eq, PartialEq)]
885pub enum ServerRuntimeState {
886    Open,
887    DefaultDeny,
888    PolicyEnabled,
889}
890
891/// Compute the [`ServerRuntimeState`] from the configured inputs.
892/// Pulled out as a pure function so the matrix is unit-testable
893/// without standing up the full server.
894///
895/// The classifier is the **single source of truth** for "should we
896/// start?" — both `serve()`'s single-mode and multi-mode branches
897/// call this before constructing their `AppState`. Adding a startup
898/// invariant here means both modes enforce it automatically; the
899/// alternative (per-constructor `bail!`) drifts the moment a third
900/// mode is added.
901pub fn classify_server_runtime_state(
902    has_tokens: bool,
903    has_policy: bool,
904    allow_unauthenticated: bool,
905) -> Result<ServerRuntimeState> {
906    match (has_tokens, has_policy, allow_unauthenticated) {
907        (false, false, false) => bail!(
908            "server has no bearer tokens and no policy file configured. This is a fully \
909             open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
910             if you actually want that, otherwise configure bearer tokens (see \
911             docs/user/server.md) and/or `policy.file` in omnigraph.yaml."
912        ),
913        (false, false, true) => Ok(ServerRuntimeState::Open),
914        (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
915        (false, true, _) => bail!(
916            "policy file is configured but no bearer tokens — every request would 401 \
917             because no token can ever match. Configure at least one bearer token (see \
918             docs/user/server.md), or remove the policy file. To deny all unauthenticated \
919             traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
920             produces meaningful 403s with policy-decision logging instead of silent 401s."
921        ),
922        (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
923    }
924}
925
926pub fn build_app(state: AppState) -> Router {
927    // The per-graph protected routes, identical in single + multi mode.
928    // Two middleware layers wrap them (outer first, inner last):
929    //   1. `require_bearer_auth` — extracts the bearer token and injects
930    //      `ResolvedActor` (or rejects 401).
931    //   2. `resolve_graph_handle` — injects `Arc<GraphHandle>` based on
932    //      the active mode (single: the only handle; multi: lookup by
933    //      `{graph_id}` in the URI path).
934    let per_graph_protected = Router::new()
935        .route("/snapshot", get(server_snapshot))
936        .route("/export", post(server_export))
937        // /read and /change are kept indefinitely for back-compat;
938        // their handlers carry #[deprecated] so the OpenAPI operation is
939        // flagged and their responses include RFC 9745 Deprecation +
940        // RFC 8288 Link headers. Suppress the call-site warning for the
941        // route registration itself.
942        .route("/read", post({
943            #[allow(deprecated)]
944            server_read
945        }))
946        .route("/query", post(server_query))
947        .route("/change", post({
948            #[allow(deprecated)]
949            server_change
950        }))
951        .route("/mutate", post(server_mutate))
952        .route("/schema", get(server_schema_get))
953        .route("/schema/apply", post(server_schema_apply))
954        .route(
955            "/ingest",
956            post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
957        )
958        .route(
959            "/branches",
960            get(server_branch_list).post(server_branch_create),
961        )
962        .route("/branches/{branch}", delete(server_branch_delete))
963        .route("/branches/merge", post(server_branch_merge))
964        .route("/commits", get(server_commit_list))
965        .route("/commits/{commit_id}", get(server_commit_show))
966        .route_layer(middleware::from_fn_with_state(
967            state.clone(),
968            resolve_graph_handle,
969        ))
970        .route_layer(middleware::from_fn_with_state(
971            state.clone(),
972            require_bearer_auth,
973        ));
974
975    // Management endpoints (`GET /graphs`) live alongside the per-graph
976    // router. They go through bearer auth but NOT through
977    // `resolve_graph_handle` — they operate on the registry directly.
978    // The endpoint is mounted in both modes; in single mode the handler
979    // returns 405 so clients see "resource exists, wrong context"
980    // rather than 404 "no such resource."
981    //
982    // Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not
983    // exposed in v0.6.0 — operators add graphs by editing
984    // `omnigraph.yaml` and restarting.
985    let management = Router::new()
986        .route("/graphs", get(server_graphs_list))
987        .route_layer(middleware::from_fn_with_state(
988            state.clone(),
989            require_bearer_auth,
990        ));
991
992    // Mount the protected routes differently per mode:
993    //   * Single → flat routes (legacy: `/snapshot`, `/read`, etc.)
994    //   * Multi  → nested under `/graphs/{graph_id}/...`
995    let protected: Router<AppState> = match state.routing() {
996        GraphRouting::Single { .. } => per_graph_protected.merge(management),
997        GraphRouting::Multi { .. } => Router::new()
998            .nest("/graphs/{graph_id}", per_graph_protected)
999            .merge(management),
1000    };
1001
1002    Router::new()
1003        .route("/healthz", get(server_health))
1004        .route("/openapi.json", get(server_openapi))
1005        .merge(protected)
1006        .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
1007        .layer(TraceLayer::new_for_http())
1008        .with_state(state)
1009}
1010
1011pub async fn serve(config: ServerConfig) -> Result<()> {
1012    let token_source = resolve_token_source().await?;
1013    info!(source = token_source.name(), "loaded bearer token source");
1014    let tokens = token_source.load().await?;
1015
1016    // For runtime-state classification, "any policy configured" means
1017    // either the top-level/single-mode policy file OR a server-level
1018    // policy OR any per-graph policy file. Mirrors the
1019    // `requires_bearer_auth` semantics on AppState.
1020    let has_policy_configured = match &config.mode {
1021        ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
1022        ServerConfigMode::Multi {
1023            graphs,
1024            server_policy_file,
1025            ..
1026        } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
1027    };
1028    let runtime_state = classify_server_runtime_state(
1029        !tokens.is_empty(),
1030        has_policy_configured,
1031        config.allow_unauthenticated,
1032    )?;
1033    match runtime_state {
1034        ServerRuntimeState::Open => warn!(
1035            "running with --unauthenticated: no bearer tokens, no policy file, all \
1036             requests permitted. This is for local dev only — do not expose to a \
1037             network you don't fully trust."
1038        ),
1039        ServerRuntimeState::DefaultDeny => warn!(
1040            "bearer tokens are configured but no policy file is set — running in \
1041             default-deny mode (only `read` actions are permitted for authenticated \
1042             actors). Configure `policy.file` in omnigraph.yaml to enable Cedar rules."
1043        ),
1044        ServerRuntimeState::PolicyEnabled => {}
1045    }
1046
1047    let bind = config.bind.clone();
1048    let state = match config.mode {
1049        ServerConfigMode::Single { uri, policy_file } => {
1050            let uri_for_log = uri.clone();
1051            info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph");
1052            AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await?
1053        }
1054        ServerConfigMode::Multi {
1055            graphs,
1056            config_path,
1057            server_policy_file,
1058        } => {
1059            info!(
1060                bind = %bind,
1061                mode = "multi",
1062                graph_count = graphs.len(),
1063                config = %config_path.display(),
1064                "serving omnigraph"
1065            );
1066            open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
1067        }
1068    };
1069
1070    let listener = TcpListener::bind(&bind).await?;
1071    axum::serve(listener, build_app(state))
1072        .with_graceful_shutdown(shutdown_signal())
1073        .await?;
1074    Ok(())
1075}
1076
1077/// Parallel open of every graph in the startup config, with bounded
1078/// concurrency (`buffer_unordered(4)`). Fail-fast — the first open error
1079/// aborts startup; other in-flight opens are dropped (their `Omnigraph`
1080/// instances close cleanly via Arc drop).
1081///
1082/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
1083/// trades startup latency for a small amount of concurrent S3 / Lance
1084/// open pressure.
1085async fn open_multi_graph_state(
1086    graphs: Vec<GraphStartupConfig>,
1087    tokens: Vec<(String, String)>,
1088    server_policy_file: Option<&PathBuf>,
1089    config_path: PathBuf,
1090) -> Result<AppState> {
1091    use futures::{StreamExt, TryStreamExt};
1092
1093    if graphs.is_empty() {
1094        bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1095    }
1096
1097    // Server-level policy (loaded once, applies to management endpoints).
1098    // The placeholder graph_id `"server"` is the sentinel the Cedar
1099    // resource-model refactor maps to the singleton
1100    // `Omnigraph::Server::"root"` entity at evaluation time.
1101    let server_policy = match server_policy_file {
1102        Some(path) => Some(PolicyEngine::load_server(path)?),
1103        None => None,
1104    };
1105
1106    // `try_collect` propagates the first error eagerly, dropping every
1107    // in-flight open. `buffer_unordered + collect::<Vec<_>>` would drain
1108    // the stream before checking errors — incorrect for the docstring's
1109    // "fail-fast" claim and wasteful on S3-backed graphs.
1110    let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
1111        .map(|cfg| async move { open_single_graph(cfg).await })
1112        .buffer_unordered(4)
1113        .try_collect()
1114        .await?;
1115
1116    let workload = workload::WorkloadController::from_env();
1117    let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1118        .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1119    Ok(state)
1120}
1121
1122/// Open one graph and wrap it in a `GraphHandle`. Used at startup by
1123/// `open_multi_graph_state`.
1124async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1125    let graph_id = GraphId::try_from(cfg.graph_id.clone())
1126        .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1127    let uri = normalize_root_uri(&cfg.uri)
1128        .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1129
1130    let db = Omnigraph::open(&uri)
1131        .await
1132        .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1133
1134    let (policy_arc, db) = match &cfg.policy_file {
1135        Some(path) => {
1136            let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
1137            let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1138            let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1139            (Some(policy_arc), db.with_policy(checker))
1140        }
1141        None => (None, db),
1142    };
1143
1144    Ok(Arc::new(GraphHandle {
1145        key: GraphKey::cluster(graph_id),
1146        uri,
1147        engine: Arc::new(db),
1148        policy: policy_arc,
1149    }))
1150}
1151
1152async fn shutdown_signal() {
1153    if let Err(err) = tokio::signal::ctrl_c().await {
1154        error!(error = %err, "failed to install ctrl-c handler");
1155        return;
1156    }
1157    info!("shutdown signal received");
1158}
1159
1160#[utoipa::path(
1161    get,
1162    path = "/healthz",
1163    tag = "health",
1164    operation_id = "health",
1165    responses(
1166        (status = 200, description = "Server is healthy", body = HealthOutput),
1167    ),
1168)]
1169/// Liveness probe.
1170///
1171/// Returns server status and version. Unauthenticated; safe to call from any
1172/// caller. Use this to confirm the server is reachable before invoking other
1173/// endpoints.
1174async fn server_health() -> Json<HealthOutput> {
1175    Json(HealthOutput {
1176        status: "ok".to_string(),
1177        version: SERVER_VERSION.to_string(),
1178        source_version: SERVER_SOURCE_VERSION.map(str::to_string),
1179    })
1180}
1181
1182#[utoipa::path(
1183    get,
1184    path = "/graphs",
1185    tag = "management",
1186    operation_id = "listGraphs",
1187    responses(
1188        (status = 200, description = "List of registered graphs", body = GraphListResponse),
1189        (status = 401, description = "Unauthorized", body = ErrorOutput),
1190        (status = 403, description = "Forbidden", body = ErrorOutput),
1191        (status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput),
1192    ),
1193    security(("bearer_token" = [])),
1194)]
1195/// List every graph currently registered with this server (MR-668).
1196///
1197/// Multi-graph mode only. In single mode, the route returns 405 — there's
1198/// no registry to enumerate. Cedar-gated by the server-level policy via
1199/// the `graph_list` action against `Omnigraph::Server::"root"`.
1200///
1201/// Order: alphabetical by `graph_id` (server-sorted so clients see
1202/// deterministic output across requests).
1203async fn server_graphs_list(
1204    State(state): State<AppState>,
1205    actor: Option<Extension<ResolvedActor>>,
1206) -> std::result::Result<Json<GraphListResponse>, ApiError> {
1207    // 405 in single mode — there's no registry to enumerate, and the
1208    // legacy URL surface didn't expose this endpoint.
1209    let registry = match state.routing() {
1210        GraphRouting::Single { .. } => {
1211            return Err(ApiError::method_not_allowed(
1212                "GET /graphs is only available in multi-graph mode",
1213            ));
1214        }
1215        GraphRouting::Multi { registry, .. } => registry,
1216    };
1217
1218    // Server-level Cedar gate. `state.server_policy` is loaded from
1219    // `server.policy.file` in `omnigraph.yaml` at startup. When no
1220    // server policy is configured, `authorize_request_server` falls
1221    // through to the MR-723 default-deny semantics (every non-Read
1222    // action denied for an authenticated actor). `GraphList` is not
1223    // `Read`, so without a server policy the request gets 403 — which
1224    // is the right default (don't leak the registry until the operator
1225    // explicitly authorizes it).
1226    authorize_request(
1227        actor.as_ref().map(|Extension(actor)| actor),
1228        state.server_policy.as_deref(),
1229        PolicyRequest {
1230            action: PolicyAction::GraphList,
1231            branch: None,
1232            target_branch: None,
1233        },
1234    )?;
1235
1236    let mut graphs: Vec<GraphInfo> = registry
1237        .list()
1238        .into_iter()
1239        .map(|handle| GraphInfo {
1240            graph_id: handle.key.graph_id.as_str().to_string(),
1241            uri: handle.uri.clone(),
1242        })
1243        .collect();
1244    graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
1245    Ok(Json(GraphListResponse { graphs }))
1246}
1247
1248async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
1249    let mut doc = ApiDoc::openapi();
1250    if !state.requires_bearer_auth() {
1251        strip_security(&mut doc);
1252    }
1253    // MR-668: in multi mode, the protected routes live under
1254    // `/graphs/{graph_id}/...`. Rewrite the doc so the spec matches
1255    // the routes the router actually serves. Public paths (`/healthz`)
1256    // stay flat in both modes.
1257    if matches!(state.routing(), GraphRouting::Multi { .. }) {
1258        nest_paths_under_cluster_prefix(&mut doc);
1259    }
1260    Json(doc)
1261}
1262
1263/// Path prefix used to namespace per-graph routes in multi mode.
1264/// Kept in sync with the `Router::nest(...)` invocation in `build_app`.
1265const CLUSTER_PATH_PREFIX: &str = "/graphs/{graph_id}";
1266
1267/// Operation-id prefix applied to every cloned cluster operation.
1268/// Decision 7 in the implementation plan — keeps operation IDs unique
1269/// across the spec when both flat and nested variants ever appear in
1270/// the same generation pass.
1271const CLUSTER_OPERATION_ID_PREFIX: &str = "cluster_";
1272
1273/// Paths that stay flat in every server mode (public or server-level,
1274/// no per-graph dependency). Update this list when adding new
1275/// always-flat endpoints. `/graphs` is the management enumeration —
1276/// it lives at the root in both single mode (405) and multi mode, and
1277/// must never be rewritten to `/graphs/{graph_id}/graphs`.
1278const ALWAYS_FLAT_PATHS: &[&str] = &["/healthz", "/graphs"];
1279
1280/// In multi-mode `server_openapi`, every protected path-item is
1281/// reattached under the cluster prefix. Operation IDs gain the
1282/// `cluster_` prefix so SDK generators don't collide if/when both
1283/// surfaces are merged. Every rewritten operation also declares the
1284/// required `{graph_id}` path parameter so the served OpenAPI document
1285/// remains internally valid.
1286///
1287/// Removing the flat protected paths matches the runtime router —
1288/// in multi mode, requests to `/snapshot` etc. return 404, so the
1289/// spec must agree.
1290fn nest_paths_under_cluster_prefix(doc: &mut utoipa::openapi::OpenApi) {
1291    let original = std::mem::take(&mut doc.paths.paths);
1292    let mut rewritten = std::collections::BTreeMap::new();
1293    for (path, mut item) in original {
1294        if ALWAYS_FLAT_PATHS.contains(&path.as_str()) {
1295            rewritten.insert(path, item);
1296            continue;
1297        }
1298        rename_operation_ids(&mut item, CLUSTER_OPERATION_ID_PREFIX);
1299        add_cluster_graph_id_parameter(&mut item);
1300        let new_path = format!("{CLUSTER_PATH_PREFIX}{path}");
1301        rewritten.insert(new_path, item);
1302    }
1303    doc.paths.paths = rewritten;
1304}
1305
1306fn add_cluster_graph_id_parameter(item: &mut utoipa::openapi::PathItem) {
1307    for op in path_item_operations_mut(item) {
1308        let parameters = op.parameters.get_or_insert_with(Vec::new);
1309        let has_graph_id = parameters
1310            .iter()
1311            .any(|param| param.name == "graph_id" && param.parameter_in == ParameterIn::Path);
1312        if !has_graph_id {
1313            parameters.insert(0, graph_id_path_parameter());
1314        }
1315    }
1316}
1317
1318fn graph_id_path_parameter() -> Parameter {
1319    let mut parameter = Parameter::new("graph_id");
1320    parameter.parameter_in = ParameterIn::Path;
1321    parameter.description = Some("Graph id to route the request to.".to_string());
1322    parameter.schema = Some(Object::with_type(Type::String).into());
1323    parameter
1324}
1325
1326/// Prefix every operation_id in this PathItem with `prefix`.
1327fn rename_operation_ids(item: &mut utoipa::openapi::PathItem, prefix: &str) {
1328    for op in path_item_operations_mut(item) {
1329        if let Some(id) = op.operation_id.as_deref() {
1330            op.operation_id = Some(format!("{prefix}{id}"));
1331        }
1332    }
1333}
1334
1335fn path_item_operations_mut(
1336    item: &mut utoipa::openapi::PathItem,
1337) -> impl Iterator<Item = &mut utoipa::openapi::path::Operation> {
1338    [
1339        item.get.as_mut(),
1340        item.post.as_mut(),
1341        item.put.as_mut(),
1342        item.delete.as_mut(),
1343        item.options.as_mut(),
1344        item.head.as_mut(),
1345        item.patch.as_mut(),
1346        item.trace.as_mut(),
1347    ]
1348    .into_iter()
1349    .flatten()
1350}
1351
1352fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
1353    if let Some(components) = doc.components.as_mut() {
1354        components.security_schemes.clear();
1355    }
1356    for path_item in doc.paths.paths.values_mut() {
1357        for op in [
1358            path_item.get.as_mut(),
1359            path_item.post.as_mut(),
1360            path_item.put.as_mut(),
1361            path_item.delete.as_mut(),
1362            path_item.options.as_mut(),
1363            path_item.head.as_mut(),
1364            path_item.patch.as_mut(),
1365            path_item.trace.as_mut(),
1366        ]
1367        .into_iter()
1368        .flatten()
1369        {
1370            op.security = None;
1371        }
1372    }
1373}
1374
1375async fn require_bearer_auth(
1376    State(state): State<AppState>,
1377    mut request: Request,
1378    next: Next,
1379) -> std::result::Result<Response, ApiError> {
1380    if !state.requires_bearer_auth() {
1381        return Ok(next.run(request).await);
1382    }
1383
1384    let Some(header) = request
1385        .headers()
1386        .get(AUTHORIZATION)
1387        .and_then(|value| value.to_str().ok())
1388    else {
1389        return Err(ApiError::unauthorized("missing bearer token"));
1390    };
1391
1392    let Some(provided_token) = header.strip_prefix("Bearer ") else {
1393        return Err(ApiError::unauthorized("missing bearer token"));
1394    };
1395
1396    let Some(actor) = state.authenticate_bearer_token(provided_token) else {
1397        return Err(ApiError::unauthorized("invalid bearer token"));
1398    };
1399    request.extensions_mut().insert(actor);
1400
1401    Ok(next.run(request).await)
1402}
1403
1404/// Routing middleware (MR-668). Resolves the active graph for the
1405/// request and injects `Arc<GraphHandle>` as an extension so handlers can
1406/// extract it via `Extension<Arc<GraphHandle>>`.
1407///
1408/// **Single mode**: the routing field holds the single handle directly.
1409/// Routes are flat; every request resolves to that handle, regardless
1410/// of the URI path. No registry walk, no sentinel key, no
1411/// programmer-error guard.
1412///
1413/// **Multi mode**: routes are nested under `/graphs/{graph_id}/...`. The
1414/// middleware extracts `{graph_id}` from the URI path and looks it up in
1415/// the registry. Returns 404 if the graph is not registered.
1416///
1417/// The middleware fires AFTER `require_bearer_auth`, so the actor is
1418/// already in the request extensions (or auth was off entirely).
1419async fn resolve_graph_handle(
1420    State(state): State<AppState>,
1421    mut request: Request,
1422    next: Next,
1423) -> std::result::Result<Response, ApiError> {
1424    let handle = match &state.routing {
1425        GraphRouting::Single { handle } => Arc::clone(handle),
1426        GraphRouting::Multi { registry, .. } => {
1427            // `Router::nest("/graphs/{graph_id}", inner)` rewrites
1428            // `request.uri().path()` to the inner suffix (e.g. `/snapshot`).
1429            // The pre-rewrite URI is preserved in the `OriginalUri`
1430            // request extension by axum's router; we read from there to
1431            // extract `{graph_id}`. Fall back to the current URI only if
1432            // the extension is missing, which shouldn't happen for
1433            // nested routes but is safe defensive code.
1434            let original_path: String = request
1435                .extensions()
1436                .get::<OriginalUri>()
1437                .map(|OriginalUri(uri)| uri.path().to_string())
1438                .unwrap_or_else(|| request.uri().path().to_string());
1439            let graph_id_str = original_path
1440                .strip_prefix("/graphs/")
1441                .and_then(|rest| rest.split('/').next())
1442                .filter(|s| !s.is_empty())
1443                .ok_or_else(|| {
1444                    ApiError::bad_request(
1445                        "cluster route missing /graphs/{graph_id} prefix".to_string(),
1446                    )
1447                })?;
1448            let graph_id = GraphId::try_from(graph_id_str.to_string())
1449                .map_err(|err| ApiError::bad_request(err.to_string()))?;
1450            let key = GraphKey::cluster(graph_id.clone());
1451            match registry.get(&key) {
1452                RegistryLookup::Ready(handle) => handle,
1453                RegistryLookup::Gone => {
1454                    return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
1455                }
1456            }
1457        }
1458    };
1459
1460    // Per-request observability. `Span::current().record` would silently
1461    // no-op here because no upstream `#[tracing::instrument(...)]` macro
1462    // declares a `graph_id` field; emit an explicit event instead so the
1463    // routing decision actually lands in logs.
1464    info!(graph_id = %handle.key.graph_id, "graph routed");
1465
1466    request.extensions_mut().insert(handle);
1467    Ok(next.run(request).await)
1468}
1469
1470fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
1471    info!(
1472        actor_id = actor_id,
1473        action = %request.action,
1474        branch = request.branch.as_deref().unwrap_or(""),
1475        target_branch = request.target_branch.as_deref().unwrap_or(""),
1476        allowed = decision.allowed,
1477        matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
1478        "policy decision"
1479    );
1480}
1481
1482/// HTTP-layer Cedar policy gate. Two sources of the policy engine:
1483///   * Per-graph handler — passes `handle.policy.as_deref()` so the
1484///     graph's Cedar rules govern read/change/branch_*/schema_apply.
1485///   * Management handler — passes `state.server_policy.as_deref()` so
1486///     server-level Cedar rules govern `graph_list` (the only shipped
1487///     server-scoped action; runtime `graph_create` / `graph_delete`
1488///     are deferred until a managed cluster catalog lands).
1489///
1490/// The MR-731 invariant lives inside this function: actor identity is
1491/// supplied as a separate argument from the resolved bearer match. The
1492/// `PolicyRequest` struct itself does not carry identity (the field was
1493/// dropped from the type), so handlers cannot smuggle it through the
1494/// request. See `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
1495/// at `tests/server.rs`.
1496fn authorize_request(
1497    actor: Option<&ResolvedActor>,
1498    policy: Option<&PolicyEngine>,
1499    request: PolicyRequest,
1500) -> std::result::Result<(), ApiError> {
1501    let Some(engine) = policy else {
1502        // No PolicyEngine installed. Three runtime states can reach this:
1503        //
1504        // * **Open mode** (`--unauthenticated`): no tokens, no policy.
1505        //   Per-graph operations are open by operator opt-in (they
1506        //   accepted "trust the network" for graph data).
1507        // * **DefaultDeny mode**: tokens configured but no policy. The
1508        //   request went through bearer auth, so `actor` is Some. Only
1509        //   per-graph `Read` is permitted; other per-graph actions
1510        //   return 403. Closes the "configured auth but forgot the
1511        //   policy file" trap from MR-723.
1512        // * Either of the above with a **server-scoped** action
1513        //   (`graph_list`, future `graph_create`/`graph_delete`).
1514        //
1515        // Server-scoped actions are always denied here, regardless of
1516        // mode or actor presence. The management surface leaks server
1517        // topology (graph IDs + URIs that may contain S3 bucket paths
1518        // or internal hostnames) — operators who opted into Open mode
1519        // accepted exposure of graph DATA, not exposure of server
1520        // topology. Closing the management surface by default in every
1521        // runtime state means the docstring contract on
1522        // `server_graphs_list` ("don't leak the registry until the
1523        // operator explicitly authorizes it") holds uniformly; the
1524        // operator's only path to enabling it is configuring an
1525        // explicit `server.policy.file` in omnigraph.yaml.
1526        if request.action.resource_kind() == PolicyResourceKind::Server {
1527            return Err(ApiError::forbidden(
1528                "server-scoped actions require an explicit `server.policy.file` \
1529                 configured in omnigraph.yaml — the management surface is closed \
1530                 by default in every runtime state, including --unauthenticated, \
1531                 so that server topology is never exposed without operator opt-in.",
1532            ));
1533        }
1534        if actor.is_some() && request.action != PolicyAction::Read {
1535            return Err(ApiError::forbidden(
1536                "server runs in default-deny mode (bearer tokens configured but no \
1537                 policy file). Only `read` actions are permitted; configure \
1538                 `policy.file` in omnigraph.yaml to enable other actions.",
1539            ));
1540        }
1541        return Ok(());
1542    };
1543    let Some(actor) = actor else {
1544        return Err(ApiError::unauthorized("missing bearer token"));
1545    };
1546    // SECURITY INVARIANT (MR-731): actor identity is supplied to the
1547    // policy engine here as a separate argument, sourced from the
1548    // bearer-token match resolved by `require_bearer_auth`. The
1549    // `PolicyRequest` struct itself no longer carries `actor_id` (it
1550    // was dropped from the type), so handlers cannot smuggle identity
1551    // through the request body and there is no overwrite step that
1552    // could be skipped. The principle is codified in
1553    // `docs/dev/invariants.md` Hard Invariant 11 ("clients cannot set
1554    // actor identity directly") and pinned by the regression test
1555    // `actor_id_resolves_from_bearer_token_ignoring_client_supplied_headers`
1556    // in `crates/omnigraph-server/tests/server.rs`.
1557    let actor_id = actor.actor_id.as_ref();
1558    let decision = engine
1559        .authorize(actor_id, &request)
1560        .map_err(|err| ApiError::internal(format!("policy: {err}")))?;
1561    log_policy_decision(actor_id, &request, &decision);
1562    if decision.allowed {
1563        Ok(())
1564    } else {
1565        Err(ApiError::forbidden(decision.message))
1566    }
1567}
1568
1569#[utoipa::path(
1570    get,
1571    path = "/snapshot",
1572    tag = "snapshots",
1573    operation_id = "getSnapshot",
1574    params(SnapshotQuery),
1575    responses(
1576        (status = 200, description = "Database snapshot", body = api::SnapshotOutput),
1577        (status = 401, description = "Unauthorized", body = ErrorOutput),
1578        (status = 403, description = "Forbidden", body = ErrorOutput),
1579    ),
1580    security(("bearer_token" = [])),
1581)]
1582/// Read the current snapshot of a branch.
1583///
1584/// Returns the manifest version plus per-table metadata (path, version, row
1585/// count) for every table on the branch. Defaults to `main` when `branch` is
1586/// omitted. Read-only.
1587async fn server_snapshot(
1588    Extension(handle): Extension<Arc<GraphHandle>>,
1589    actor: Option<Extension<ResolvedActor>>,
1590    Query(query): Query<SnapshotQuery>,
1591) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
1592    let branch = query.branch.unwrap_or_else(|| "main".to_string());
1593    authorize_request(
1594        actor.as_ref().map(|Extension(actor)| actor),
1595        handle.policy.as_deref(),
1596        PolicyRequest {
1597            action: PolicyAction::Read,
1598            branch: Some(branch.clone()),
1599            target_branch: None,
1600        },
1601    )?;
1602    let snapshot = {
1603        let db = &handle.engine;
1604        db.snapshot_of(ReadTarget::branch(branch.as_str()))
1605            .await
1606            .map_err(ApiError::from_omni)?
1607    };
1608    Ok(Json(snapshot_payload(&branch, &snapshot)))
1609}
1610
1611/// Header values that flag a response as coming from a deprecated route
1612/// (RFC 9745 / RFC 8288) and point at the canonical successor.
1613fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValue); 2] {
1614    [
1615        (
1616            HeaderName::from_static("deprecation"),
1617            HeaderValue::from_static("true"),
1618        ),
1619        (
1620            HeaderName::from_static("link"),
1621            HeaderValue::from_static(successor_link),
1622        ),
1623    ]
1624}
1625
1626#[utoipa::path(
1627    post,
1628    path = "/read",
1629    tag = "queries",
1630    operation_id = "read",
1631    request_body = ReadRequest,
1632    responses(
1633        (status = 200, description = "Query results (response includes `Deprecation: true` + `Link: </query>; rel=\"successor-version\"`)", body = ReadOutput),
1634        (status = 400, description = "Bad request", body = ErrorOutput),
1635        (status = 401, description = "Unauthorized", body = ErrorOutput),
1636        (status = 403, description = "Forbidden", body = ErrorOutput),
1637    ),
1638    security(("bearer_token" = [])),
1639)]
1640#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
1641/// **Deprecated** — use [`POST /query`](#tag/queries/operation/query) instead.
1642///
1643/// Execute a GQ read query. Behavior is unchanged from prior releases; the
1644/// route is kept indefinitely for byte-stable back-compat. New integrations
1645/// should target `POST /query`, which has clean field names (`query` /
1646/// `name`) and a 400-on-mutation guard. Responses from this route include
1647/// `Deprecation: true` and `Link: </query>; rel="successor-version"`
1648/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
1649/// signal.
1650async fn server_read(
1651    Extension(handle): Extension<Arc<GraphHandle>>,
1652    actor: Option<Extension<ResolvedActor>>,
1653    Json(request): Json<ReadRequest>,
1654) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ReadOutput>), ApiError> {
1655    let (selected_name, target, result) = run_query(
1656        handle,
1657        actor.as_ref().map(|Extension(actor)| actor),
1658        &request.query_source,
1659        request.query_name.as_deref(),
1660        request.params.as_ref(),
1661        request.branch,
1662        request.snapshot,
1663        false, // /read predates the D2 rule; legacy callers may submit mutating queries here
1664    )
1665    .await?;
1666    Ok((
1667        deprecation_headers("</query>; rel=\"successor-version\""),
1668        Json(api::read_output(selected_name, &target, result)),
1669    ))
1670}
1671
1672#[utoipa::path(
1673    post,
1674    path = "/query",
1675    tag = "queries",
1676    operation_id = "query",
1677    request_body = QueryRequest,
1678    responses(
1679        (status = 200, description = "Query results", body = ReadOutput),
1680        (status = 400, description = "Bad request - also returned when the query body contains mutations; use POST /mutate (or its deprecated alias POST /change) for write queries", body = ErrorOutput),
1681        (status = 401, description = "Unauthorized", body = ErrorOutput),
1682        (status = 403, description = "Forbidden", body = ErrorOutput),
1683    ),
1684    security(("bearer_token" = [])),
1685)]
1686/// Execute an inline read query (friendlier-named alternative to `POST /read`).
1687///
1688/// Designed for ad-hoc exploration and AI-agent tool-use: short field
1689/// names (`query`, `name`) match the CLI `-e` flag and the GQ `query`
1690/// keyword. Mutations (`insert`/`update`/`delete`) are rejected with 400
1691/// -- use `POST /mutate` (or its deprecated alias `POST /change`) for
1692/// write queries. Otherwise behaves identically to `POST /read`: same
1693/// target semantics (branch xor snapshot), same Cedar action (Read),
1694/// same response shape.
1695async fn server_query(
1696    Extension(handle): Extension<Arc<GraphHandle>>,
1697    actor: Option<Extension<ResolvedActor>>,
1698    Json(request): Json<QueryRequest>,
1699) -> std::result::Result<Json<ReadOutput>, ApiError> {
1700    let (selected_name, target, result) = run_query(
1701        handle,
1702        actor.as_ref().map(|Extension(actor)| actor),
1703        &request.query,
1704        request.name.as_deref(),
1705        request.params.as_ref(),
1706        request.branch,
1707        request.snapshot,
1708        true, // /query is read-only; reject mutations
1709    )
1710    .await?;
1711    Ok(Json(api::read_output(selected_name, &target, result)))
1712}
1713
1714#[utoipa::path(
1715    post,
1716    path = "/export",
1717    tag = "queries",
1718    operation_id = "export",
1719    request_body = ExportRequest,
1720    responses(
1721        (status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
1722        (status = 400, description = "Bad request", body = ErrorOutput),
1723        (status = 401, description = "Unauthorized", body = ErrorOutput),
1724        (status = 403, description = "Forbidden", body = ErrorOutput),
1725    ),
1726    security(("bearer_token" = [])),
1727)]
1728/// Stream the contents of a branch as NDJSON.
1729///
1730/// Emits one JSON object per line (`application/x-ndjson`). Filter with
1731/// `type_names` (node/edge type names) and/or `table_keys`; both empty
1732/// streams the entire branch. Suitable for large exports — the response is
1733/// streamed, not buffered. Read-only.
1734async fn server_export(
1735    Extension(handle): Extension<Arc<GraphHandle>>,
1736    actor: Option<Extension<ResolvedActor>>,
1737    Json(request): Json<ExportRequest>,
1738) -> std::result::Result<Response, ApiError> {
1739    let branch = request.branch.unwrap_or_else(|| "main".to_string());
1740    authorize_request(
1741        actor.as_ref().map(|Extension(actor)| actor),
1742        handle.policy.as_deref(),
1743        PolicyRequest {
1744            action: PolicyAction::Export,
1745            branch: Some(branch.clone()),
1746            target_branch: None,
1747        },
1748    )?;
1749    let engine = Arc::clone(&handle.engine);
1750    let type_names = request.type_names.clone();
1751    let table_keys = request.table_keys.clone();
1752    let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
1753    tokio::spawn(async move {
1754        let result = {
1755            let mut writer = ExportStreamWriter { sender: tx.clone() };
1756            engine
1757                .export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
1758                .await
1759        };
1760        if let Err(err) = result {
1761            let _ = tx.send(Err(io::Error::other(err.to_string())));
1762        }
1763    });
1764    let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
1765        rx.recv().await.map(|item| (item, rx))
1766    }));
1767    Ok((
1768        StatusCode::OK,
1769        [(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
1770        body,
1771    )
1772        .into_response())
1773}
1774
1775/// Shared implementation behind `POST /mutate` (canonical) and
1776/// `POST /change` (deprecated alias). Returns the bare `ChangeOutput`;
1777/// each route handler wraps it (the alias also attaches Deprecation
1778/// headers).
1779/// Shared backend for `/mutate` (canonical) and `/change` (deprecated alias).
1780///
1781/// Decoupled from `ChangeRequest` so MR-969's `/queries/{name}` stored-query
1782/// handler can call this directly with registry-supplied fields without
1783/// rebuilding the request body. Today's HTTP handlers unpack the request and
1784/// call here; the registry would do the same.
1785async fn run_mutate(
1786    state: AppState,
1787    handle: Arc<GraphHandle>,
1788    actor: Option<&ResolvedActor>,
1789    query: &str,
1790    name: Option<&str>,
1791    params_json: Option<&Value>,
1792    branch: String,
1793) -> std::result::Result<ChangeOutput, ApiError> {
1794    let actor_arc = actor
1795        .map(|a| Arc::clone(&a.actor_id))
1796        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
1797    let actor_id = actor.map(|a| a.actor_id.as_ref());
1798    authorize_request(
1799        actor,
1800        handle.policy.as_deref(),
1801        PolicyRequest {
1802            action: PolicyAction::Change,
1803            branch: Some(branch.clone()),
1804            target_branch: None,
1805        },
1806    )?;
1807    // Per-actor admission: bound concurrent in-flight mutations and
1808    // estimated bytes per actor. Cedar runs FIRST so denied requests
1809    // don't consume admission slots. Estimate uses the request body
1810    // size as a coarse proxy; engine memory pressure can run higher.
1811    let est_bytes = query.len() as u64
1812        + params_json
1813            .map(|p| p.to_string().len() as u64)
1814            .unwrap_or(0);
1815    let _admission = state
1816        .workload
1817        .try_admit(&actor_arc, est_bytes)
1818        .map_err(ApiError::from_workload_reject)?;
1819    let (selected_name, query_params) =
1820        select_named_query(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
1821    let params = query_params_from_json(&query_params, params_json)
1822        .map_err(|err| ApiError::bad_request(err.to_string()))?;
1823
1824    let result = {
1825        let db = &handle.engine;
1826        db.mutate_as(&branch, query, &selected_name, &params, actor_id)
1827            .await
1828            .map_err(ApiError::from_omni)?
1829    };
1830    Ok(ChangeOutput {
1831        branch,
1832        query_name: selected_name,
1833        affected_nodes: result.affected_nodes,
1834        affected_edges: result.affected_edges,
1835        actor_id: actor_id.map(str::to_string),
1836    })
1837}
1838
1839/// Shared backend for `/query` (canonical) and `/read` (deprecated alias).
1840///
1841/// Mirrors [`run_mutate`]'s decoupled shape so MR-969's stored-query handler
1842/// can call here with registry-supplied fields. Rejects inline source that
1843/// contains mutations (D2 rule); callers wanting writes go through
1844/// [`run_mutate`] instead.
1845///
1846/// Intentionally does **not** take [`AppState`] (unlike [`run_mutate`]):
1847/// reads are not admission-gated today, so there is no `state.workload`
1848/// consumer. The signature grows the parameter when Phase 1 (MR-976) adds
1849/// the request envelope's `expect: { max_rows_scanned: N }` budget, or
1850/// MR-969 extends per-actor admission to stored-read invocations.
1851async fn run_query(
1852    handle: Arc<GraphHandle>,
1853    actor: Option<&ResolvedActor>,
1854    query: &str,
1855    name: Option<&str>,
1856    params_json: Option<&Value>,
1857    branch: Option<String>,
1858    snapshot: Option<String>,
1859    reject_mutations: bool,
1860) -> std::result::Result<(String, ReadTarget, omnigraph_compiler::result::QueryResult), ApiError> {
1861    if branch.is_some() && snapshot.is_some() {
1862        return Err(ApiError::bad_request(
1863            "request may specify branch or snapshot, not both",
1864        ));
1865    }
1866
1867    let target = read_target_from_request(branch, snapshot);
1868    let policy_branch = match &target {
1869        ReadTarget::Branch(branch) => Some(branch.clone()),
1870        ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
1871            let db = &handle.engine;
1872            db.resolved_branch_of(target.clone())
1873                .await
1874                .map(|branch| branch.or_else(|| Some("main".to_string())))
1875                .map_err(ApiError::from_omni)?
1876        }
1877        ReadTarget::Snapshot(_) => None,
1878    };
1879    authorize_request(
1880        actor,
1881        handle.policy.as_deref(),
1882        PolicyRequest {
1883            action: PolicyAction::Read,
1884            branch: policy_branch,
1885            target_branch: None,
1886        },
1887    )?;
1888    let query_decl =
1889        select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
1890    if reject_mutations && !query_decl.mutations.is_empty() {
1891        return Err(ApiError::bad_request(format!(
1892            "query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
1893            query_decl.name
1894        )));
1895    }
1896    let selected_name = query_decl.name.clone();
1897    let params = query_params_from_json(&query_decl.params, params_json)
1898        .map_err(|err| ApiError::bad_request(err.to_string()))?;
1899
1900    let result = {
1901        let db = &handle.engine;
1902        db.query(target.clone(), query, &selected_name, &params)
1903            .await
1904            .map_err(ApiError::from_omni)?
1905    };
1906    Ok((selected_name, target, result))
1907}
1908
1909#[utoipa::path(
1910    post,
1911    path = "/change",
1912    tag = "mutations",
1913    operation_id = "change",
1914    request_body = ChangeRequest,
1915    responses(
1916        (status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: </mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
1917        (status = 400, description = "Bad request", body = ErrorOutput),
1918        (status = 401, description = "Unauthorized", body = ErrorOutput),
1919        (status = 403, description = "Forbidden", body = ErrorOutput),
1920        (status = 409, description = "Merge conflict", body = ErrorOutput),
1921        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
1922    ),
1923    security(("bearer_token" = [])),
1924)]
1925#[deprecated(note = "use POST /mutate instead; /change is kept indefinitely for back-compat")]
1926/// **Deprecated** — use [`POST /mutate`](#tag/mutations/operation/mutate) instead.
1927///
1928/// Apply a GQ mutation to a branch. Behavior is unchanged; the route is
1929/// kept indefinitely for back-compat. New integrations should target
1930/// `POST /mutate`, which has identical semantics and a name that pairs
1931/// cleanly with `POST /query`. Responses from this route include
1932/// `Deprecation: true` and `Link: </mutate>; rel="successor-version"`
1933/// headers per RFC 9745 / RFC 8288 so SDKs and proxies can surface the
1934/// signal.
1935async fn server_change(
1936    State(state): State<AppState>,
1937    Extension(handle): Extension<Arc<GraphHandle>>,
1938    actor: Option<Extension<ResolvedActor>>,
1939    Json(request): Json<ChangeRequest>,
1940) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ChangeOutput>), ApiError> {
1941    let branch = request.branch.unwrap_or_else(|| "main".to_string());
1942    let output = run_mutate(
1943        state,
1944        handle,
1945        actor.as_ref().map(|Extension(actor)| actor),
1946        &request.query,
1947        request.name.as_deref(),
1948        request.params.as_ref(),
1949        branch,
1950    )
1951    .await?;
1952    Ok((
1953        deprecation_headers("</mutate>; rel=\"successor-version\""),
1954        Json(output),
1955    ))
1956}
1957
1958#[utoipa::path(
1959    post,
1960    path = "/mutate",
1961    tag = "mutations",
1962    operation_id = "mutate",
1963    request_body = ChangeRequest,
1964    responses(
1965        (status = 200, description = "Mutation results", body = ChangeOutput),
1966        (status = 400, description = "Bad request", body = ErrorOutput),
1967        (status = 401, description = "Unauthorized", body = ErrorOutput),
1968        (status = 403, description = "Forbidden", body = ErrorOutput),
1969        (status = 409, description = "Merge conflict", body = ErrorOutput),
1970        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
1971    ),
1972    security(("bearer_token" = [])),
1973)]
1974/// Apply a GQ mutation to a branch (canonical mutation endpoint).
1975///
1976/// Writes to the named `branch` (defaults to `main`). Mutations are atomic
1977/// per call and produce a new commit. Returns counts of nodes and edges
1978/// affected. **Destructive**: on success the branch is updated; rejected
1979/// mutations may still acquire locks briefly. Returns 409 on merge conflict.
1980///
1981/// Pairs with `POST /query` (read-only). The legacy `POST /change` route
1982/// has identical semantics and is kept as a deprecated alias.
1983async fn server_mutate(
1984    State(state): State<AppState>,
1985    Extension(handle): Extension<Arc<GraphHandle>>,
1986    actor: Option<Extension<ResolvedActor>>,
1987    Json(request): Json<ChangeRequest>,
1988) -> std::result::Result<Json<ChangeOutput>, ApiError> {
1989    let branch = request.branch.unwrap_or_else(|| "main".to_string());
1990    Ok(Json(
1991        run_mutate(
1992            state,
1993            handle,
1994            actor.as_ref().map(|Extension(actor)| actor),
1995            &request.query,
1996            request.name.as_deref(),
1997            request.params.as_ref(),
1998            branch,
1999        )
2000        .await?,
2001    ))
2002}
2003
2004#[utoipa::path(
2005    get,
2006    path = "/schema",
2007    tag = "schema",
2008    operation_id = "getSchema",
2009    responses(
2010        (status = 200, description = "Current schema source", body = SchemaOutput),
2011        (status = 401, description = "Unauthorized", body = ErrorOutput),
2012        (status = 403, description = "Forbidden", body = ErrorOutput),
2013    ),
2014    security(("bearer_token" = [])),
2015)]
2016/// Read the current schema source.
2017///
2018/// Returns the project's schema as a single string in `.pg` source form.
2019/// Useful for clients that want to introspect available types and tables
2020/// before constructing GQ queries. Read-only.
2021async fn server_schema_get(
2022    Extension(handle): Extension<Arc<GraphHandle>>,
2023    actor: Option<Extension<ResolvedActor>>,
2024) -> std::result::Result<Json<SchemaOutput>, ApiError> {
2025    authorize_request(
2026        actor.as_ref().map(|Extension(actor)| actor),
2027        handle.policy.as_deref(),
2028        PolicyRequest {
2029            action: PolicyAction::Read,
2030            branch: None,
2031            target_branch: None,
2032        },
2033    )?;
2034    let schema_source = {
2035        let db = &handle.engine;
2036        db.schema_source().to_string()
2037    };
2038    Ok(Json(SchemaOutput { schema_source }))
2039}
2040
2041#[utoipa::path(
2042    post,
2043    path = "/schema/apply",
2044    tag = "mutations",
2045    operation_id = "applySchema",
2046    request_body = SchemaApplyRequest,
2047    responses(
2048        (status = 200, description = "Schema apply results", body = SchemaApplyOutput),
2049        (status = 400, description = "Bad request", body = ErrorOutput),
2050        (status = 401, description = "Unauthorized", body = ErrorOutput),
2051        (status = 403, description = "Forbidden", body = ErrorOutput),
2052        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2053    ),
2054    security(("bearer_token" = [])),
2055)]
2056/// Apply a schema migration.
2057///
2058/// Diffs `schema_source` against the current schema and applies the resulting
2059/// migration steps (add/drop type, add/drop column, etc.). **Destructive**:
2060/// some steps drop data. Returns the list of steps applied; if `applied` is
2061/// false the diff was unsupported and no changes were made.
2062async fn server_schema_apply(
2063    State(state): State<AppState>,
2064    Extension(handle): Extension<Arc<GraphHandle>>,
2065    actor: Option<Extension<ResolvedActor>>,
2066    Json(request): Json<SchemaApplyRequest>,
2067) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
2068    let actor_arc = actor
2069        .as_ref()
2070        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2071        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2072    let actor_id = actor
2073        .as_ref()
2074        .map(|Extension(actor)| actor.actor_id.as_ref());
2075    authorize_request(
2076        actor.as_ref().map(|Extension(actor)| actor),
2077        handle.policy.as_deref(),
2078        PolicyRequest {
2079            action: PolicyAction::SchemaApply,
2080            branch: None,
2081            target_branch: Some("main".to_string()),
2082        },
2083    )?;
2084    let est_bytes = request.schema_source.len() as u64;
2085    let _admission = state
2086        .workload
2087        .try_admit(&actor_arc, est_bytes)
2088        .map_err(ApiError::from_workload_reject)?;
2089    let result = {
2090        let db = &handle.engine;
2091        // Engine-layer policy enforcement (MR-722): pass the resolved
2092        // actor through so apply_schema_as can call enforce() with the
2093        // authoritative identity. With a policy installed in AppState,
2094        // engine-side enforcement re-checks the same decision the
2095        // HTTP-layer authorize_request just made above. PR #3 collapses
2096        // the redundancy.
2097        db.apply_schema_as(
2098            &request.schema_source,
2099            omnigraph::db::SchemaApplyOptions {
2100                allow_data_loss: request.allow_data_loss,
2101            },
2102            actor_id,
2103        )
2104        .await
2105        .map_err(ApiError::from_omni)?
2106    };
2107    Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
2108}
2109
2110#[utoipa::path(
2111    post,
2112    path = "/ingest",
2113    tag = "mutations",
2114    operation_id = "ingest",
2115    request_body = IngestRequest,
2116    responses(
2117        (status = 200, description = "Ingest results", body = IngestOutput),
2118        (status = 400, description = "Bad request", body = ErrorOutput),
2119        (status = 401, description = "Unauthorized", body = ErrorOutput),
2120        (status = 403, description = "Forbidden", body = ErrorOutput),
2121        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2122    ),
2123    security(("bearer_token" = [])),
2124)]
2125/// Bulk-ingest NDJSON data into a branch.
2126///
2127/// `data` is NDJSON with one record per line. `mode` controls behavior on
2128/// existing rows: `merge` upserts by id (default), `append` blindly inserts,
2129/// `overwrite` replaces table contents. If `branch` does not exist it is
2130/// created from `from` (defaults to `main`). **Destructive** when `mode` is
2131/// `overwrite` or when ingest produces conflicting writes.
2132async fn server_ingest(
2133    State(state): State<AppState>,
2134    Extension(handle): Extension<Arc<GraphHandle>>,
2135    actor: Option<Extension<ResolvedActor>>,
2136    Json(request): Json<IngestRequest>,
2137) -> std::result::Result<Json<IngestOutput>, ApiError> {
2138    let branch = request.branch.unwrap_or_else(|| "main".to_string());
2139    let from = request.from.unwrap_or_else(|| "main".to_string());
2140    let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
2141    let actor_arc = actor
2142        .as_ref()
2143        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2144        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2145    let actor_id = actor
2146        .as_ref()
2147        .map(|Extension(actor)| actor.actor_id.as_ref());
2148
2149    let branch_exists = {
2150        let db = &handle.engine;
2151        db.branch_list()
2152            .await
2153            .map_err(ApiError::from_omni)?
2154            .into_iter()
2155            .any(|name| name == branch)
2156    };
2157
2158    if !branch_exists {
2159        authorize_request(
2160            actor.as_ref().map(|Extension(actor)| actor),
2161            handle.policy.as_deref(),
2162            PolicyRequest {
2163                action: PolicyAction::BranchCreate,
2164                branch: Some(from.clone()),
2165                target_branch: Some(branch.clone()),
2166            },
2167        )?;
2168    }
2169    authorize_request(
2170        actor.as_ref().map(|Extension(actor)| actor),
2171        handle.policy.as_deref(),
2172        PolicyRequest {
2173            action: PolicyAction::Change,
2174            branch: Some(branch.clone()),
2175            target_branch: None,
2176        },
2177    )?;
2178    let est_bytes = request.data.len() as u64;
2179    let _admission = state
2180        .workload
2181        .try_admit(&actor_arc, est_bytes)
2182        .map_err(ApiError::from_workload_reject)?;
2183
2184    let result = {
2185        let db = &handle.engine;
2186        db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
2187            .await
2188            .map_err(ApiError::from_omni)?
2189    };
2190
2191    Ok(Json(ingest_output(
2192        handle.uri.as_str(),
2193        &result,
2194        actor_id.map(str::to_string),
2195    )))
2196}
2197
2198#[utoipa::path(
2199    get,
2200    path = "/branches",
2201    tag = "branches",
2202    operation_id = "listBranches",
2203    responses(
2204        (status = 200, description = "List of branches", body = BranchListOutput),
2205        (status = 401, description = "Unauthorized", body = ErrorOutput),
2206        (status = 403, description = "Forbidden", body = ErrorOutput),
2207    ),
2208    security(("bearer_token" = [])),
2209)]
2210/// List all branches.
2211///
2212/// Returns branch names sorted alphabetically. Read-only.
2213async fn server_branch_list(
2214    Extension(handle): Extension<Arc<GraphHandle>>,
2215    actor: Option<Extension<ResolvedActor>>,
2216) -> std::result::Result<Json<BranchListOutput>, ApiError> {
2217    authorize_request(
2218        actor.as_ref().map(|Extension(actor)| actor),
2219        handle.policy.as_deref(),
2220        PolicyRequest {
2221            action: PolicyAction::Read,
2222            branch: None,
2223            target_branch: None,
2224        },
2225    )?;
2226    let mut branches = {
2227        let db = &handle.engine;
2228        db.branch_list().await.map_err(ApiError::from_omni)?
2229    };
2230    branches.sort();
2231    Ok(Json(BranchListOutput { branches }))
2232}
2233
2234#[utoipa::path(
2235    post,
2236    path = "/branches",
2237    tag = "branches",
2238    operation_id = "createBranch",
2239    request_body = BranchCreateRequest,
2240    responses(
2241        (status = 200, description = "Branch created", body = BranchCreateOutput),
2242        (status = 400, description = "Bad request", body = ErrorOutput),
2243        (status = 401, description = "Unauthorized", body = ErrorOutput),
2244        (status = 403, description = "Forbidden", body = ErrorOutput),
2245        (status = 409, description = "Branch already exists", body = ErrorOutput),
2246        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2247    ),
2248    security(("bearer_token" = [])),
2249)]
2250/// Create a new branch.
2251///
2252/// Forks `name` off of `from` (defaults to `main`). The new branch shares
2253/// table data with its parent until it is mutated. Returns 409 if `name`
2254/// already exists.
2255async fn server_branch_create(
2256    State(state): State<AppState>,
2257    Extension(handle): Extension<Arc<GraphHandle>>,
2258    actor: Option<Extension<ResolvedActor>>,
2259    Json(request): Json<BranchCreateRequest>,
2260) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
2261    let from = request.from.unwrap_or_else(|| "main".to_string());
2262    let actor_arc = actor
2263        .as_ref()
2264        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2265        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2266    authorize_request(
2267        actor.as_ref().map(|Extension(actor)| actor),
2268        handle.policy.as_deref(),
2269        PolicyRequest {
2270            action: PolicyAction::BranchCreate,
2271            branch: Some(from.clone()),
2272            target_branch: Some(request.name.clone()),
2273        },
2274    )?;
2275    // Branch metadata only — small constant bytes estimate. The Lance
2276    // shallow-clone work is bounded by the parent's manifest size, not
2277    // the request body.
2278    let _admission = state
2279        .workload
2280        .try_admit(&actor_arc, 256)
2281        .map_err(ApiError::from_workload_reject)?;
2282    {
2283        let db = &handle.engine;
2284        db.branch_create_from_as(
2285            ReadTarget::branch(&from),
2286            &request.name,
2287            actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
2288        )
2289        .await
2290        .map_err(ApiError::from_omni)?;
2291    }
2292    Ok(Json(BranchCreateOutput {
2293        uri: handle.uri.clone(),
2294        from,
2295        name: request.name,
2296        actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
2297    }))
2298}
2299
2300/// Path-param shape for [`server_branch_delete`]. Named-field
2301/// deserialization (rather than `Path<String>` or `Path<(String,)>`)
2302/// keeps the extractor stable across single-mode flat routes and
2303/// multi-mode nested routes: the `{branch}` capture is picked by
2304/// name and any other captures in scope (e.g. `{graph_id}` in
2305/// multi-mode) are ignored without breaking deserialization.
2306///
2307/// Closes the "handler path-extractor type is positional and breaks
2308/// when route nesting changes" class.
2309#[derive(Deserialize)]
2310struct BranchPath {
2311    branch: String,
2312}
2313
2314#[utoipa::path(
2315    delete,
2316    path = "/branches/{branch}",
2317    tag = "branches",
2318    operation_id = "deleteBranch",
2319    params(
2320        ("branch" = String, Path, description = "Branch name to delete"),
2321    ),
2322    responses(
2323        (status = 200, description = "Branch deleted", body = BranchDeleteOutput),
2324        (status = 401, description = "Unauthorized", body = ErrorOutput),
2325        (status = 403, description = "Forbidden", body = ErrorOutput),
2326        (status = 404, description = "Branch not found", body = ErrorOutput),
2327        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2328    ),
2329    security(("bearer_token" = [])),
2330)]
2331/// Delete a branch.
2332///
2333/// **Irreversible.** Removes the branch pointer; commits remain reachable
2334/// only if referenced by another branch. Returns 404 if the branch does not
2335/// exist.
2336async fn server_branch_delete(
2337    State(state): State<AppState>,
2338    Extension(handle): Extension<Arc<GraphHandle>>,
2339    actor: Option<Extension<ResolvedActor>>,
2340    Path(BranchPath { branch }): Path<BranchPath>,
2341) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
2342    let actor_arc = actor
2343        .as_ref()
2344        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2345        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2346    let actor_id = actor
2347        .as_ref()
2348        .map(|Extension(actor)| actor.actor_id.as_ref());
2349    authorize_request(
2350        actor.as_ref().map(|Extension(actor)| actor),
2351        handle.policy.as_deref(),
2352        PolicyRequest {
2353            action: PolicyAction::BranchDelete,
2354            branch: None,
2355            target_branch: Some(branch.clone()),
2356        },
2357    )?;
2358    // Metadata-only manifest tombstone — small constant estimate.
2359    let _admission = state
2360        .workload
2361        .try_admit(&actor_arc, 256)
2362        .map_err(ApiError::from_workload_reject)?;
2363    {
2364        let db = &handle.engine;
2365        db.branch_delete_as(&branch, actor_id)
2366            .await
2367            .map_err(ApiError::from_omni)?;
2368    }
2369    Ok(Json(BranchDeleteOutput {
2370        uri: handle.uri.clone(),
2371        name: branch,
2372        actor_id: actor_id.map(str::to_string),
2373    }))
2374}
2375
2376#[utoipa::path(
2377    post,
2378    path = "/branches/merge",
2379    tag = "branches",
2380    operation_id = "mergeBranches",
2381    request_body = BranchMergeRequest,
2382    responses(
2383        (status = 200, description = "Branches merged", body = BranchMergeOutput),
2384        (status = 400, description = "Bad request", body = ErrorOutput),
2385        (status = 401, description = "Unauthorized", body = ErrorOutput),
2386        (status = 403, description = "Forbidden", body = ErrorOutput),
2387        (status = 409, description = "Merge conflict", body = ErrorOutput),
2388        (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2389    ),
2390    security(("bearer_token" = [])),
2391)]
2392/// Merge one branch into another.
2393///
2394/// Merges `source` into `target` (defaults to `main`). Outcome is one of
2395/// `already_up_to_date`, `fast_forward`, or `merged`. Returns 409 with the
2396/// list of conflicts if the merge cannot be completed; the target is left
2397/// unchanged in that case. **Destructive** to `target` on success.
2398async fn server_branch_merge(
2399    State(state): State<AppState>,
2400    Extension(handle): Extension<Arc<GraphHandle>>,
2401    actor: Option<Extension<ResolvedActor>>,
2402    Json(request): Json<BranchMergeRequest>,
2403) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
2404    let target = request.target.unwrap_or_else(|| "main".to_string());
2405    let actor_arc = actor
2406        .as_ref()
2407        .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2408        .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2409    let actor_id = actor
2410        .as_ref()
2411        .map(|Extension(actor)| actor.actor_id.as_ref());
2412    authorize_request(
2413        actor.as_ref().map(|Extension(actor)| actor),
2414        handle.policy.as_deref(),
2415        PolicyRequest {
2416            action: PolicyAction::BranchMerge,
2417            branch: Some(request.source.clone()),
2418            target_branch: Some(target.clone()),
2419        },
2420    )?;
2421    // Merge body is small JSON; the heavy work is in the engine but is
2422    // bounded per-(table, branch) by the writer queue. Small constant
2423    // estimate suffices for the actor in-flight count.
2424    let _admission = state
2425        .workload
2426        .try_admit(&actor_arc, 256)
2427        .map_err(ApiError::from_workload_reject)?;
2428    let outcome = {
2429        let db = &handle.engine;
2430        db.branch_merge_as(&request.source, &target, actor_id)
2431            .await
2432            .map_err(ApiError::from_omni)?
2433    };
2434    Ok(Json(BranchMergeOutput {
2435        source: request.source,
2436        target,
2437        outcome: outcome.into(),
2438        actor_id: actor_id.map(str::to_string),
2439    }))
2440}
2441
2442#[utoipa::path(
2443    get,
2444    path = "/commits",
2445    tag = "commits",
2446    operation_id = "listCommits",
2447    params(CommitListQuery),
2448    responses(
2449        (status = 200, description = "List of commits", body = CommitListOutput),
2450        (status = 401, description = "Unauthorized", body = ErrorOutput),
2451        (status = 403, description = "Forbidden", body = ErrorOutput),
2452    ),
2453    security(("bearer_token" = [])),
2454)]
2455/// List commits.
2456///
2457/// Filter by `branch` to get the commits on a single branch (most recent
2458/// first); omit to list across all branches. Read-only.
2459async fn server_commit_list(
2460    Extension(handle): Extension<Arc<GraphHandle>>,
2461    actor: Option<Extension<ResolvedActor>>,
2462    Query(query): Query<CommitListQuery>,
2463) -> std::result::Result<Json<CommitListOutput>, ApiError> {
2464    authorize_request(
2465        actor.as_ref().map(|Extension(actor)| actor),
2466        handle.policy.as_deref(),
2467        PolicyRequest {
2468            action: PolicyAction::Read,
2469            branch: query.branch.clone(),
2470            target_branch: None,
2471        },
2472    )?;
2473    let commits = {
2474        let db = &handle.engine;
2475        db.list_commits(query.branch.as_deref())
2476            .await
2477            .map_err(ApiError::from_omni)?
2478    };
2479    Ok(Json(CommitListOutput {
2480        commits: commits.iter().map(api::commit_output).collect(),
2481    }))
2482}
2483
2484/// Path-param shape for [`server_commit_show`]. See [`BranchPath`]
2485/// for the design rationale — same pattern, different field name.
2486#[derive(Deserialize)]
2487struct CommitPath {
2488    commit_id: String,
2489}
2490
2491#[utoipa::path(
2492    get,
2493    path = "/commits/{commit_id}",
2494    tag = "commits",
2495    operation_id = "getCommit",
2496    params(
2497        ("commit_id" = String, Path, description = "Commit identifier"),
2498    ),
2499    responses(
2500        (status = 200, description = "Commit details", body = api::CommitOutput),
2501        (status = 401, description = "Unauthorized", body = ErrorOutput),
2502        (status = 403, description = "Forbidden", body = ErrorOutput),
2503        (status = 404, description = "Commit not found", body = ErrorOutput),
2504    ),
2505    security(("bearer_token" = [])),
2506)]
2507
2508/// Get a single commit.
2509///
2510/// Returns the commit's manifest version, parent commit(s), and creation
2511/// metadata. Read-only.
2512async fn server_commit_show(
2513    Extension(handle): Extension<Arc<GraphHandle>>,
2514    actor: Option<Extension<ResolvedActor>>,
2515    Path(CommitPath { commit_id }): Path<CommitPath>,
2516) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
2517    authorize_request(
2518        actor.as_ref().map(|Extension(actor)| actor),
2519        handle.policy.as_deref(),
2520        PolicyRequest {
2521            action: PolicyAction::Read,
2522            branch: None,
2523            target_branch: None,
2524        },
2525    )?;
2526    let commit = {
2527        let db = &handle.engine;
2528        db.get_commit(&commit_id)
2529            .await
2530            .map_err(ApiError::from_omni)?
2531    };
2532    Ok(Json(api::commit_output(&commit)))
2533}
2534
2535fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
2536    if let Some(snapshot) = snapshot {
2537        ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
2538    } else {
2539        ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
2540    }
2541}
2542
2543fn select_named_query_decl(
2544    query_source: &str,
2545    requested_name: Option<&str>,
2546) -> Result<omnigraph_compiler::query::ast::QueryDecl> {
2547    let parsed = parse_query(query_source)?;
2548    let query = if let Some(name) = requested_name {
2549        parsed
2550            .queries
2551            .into_iter()
2552            .find(|query| query.name == name)
2553            .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
2554    } else if parsed.queries.len() == 1 {
2555        parsed.queries.into_iter().next().unwrap()
2556    } else {
2557        bail!("query file contains multiple queries; pass --name");
2558    };
2559    Ok(query)
2560}
2561
2562fn select_named_query(
2563    query_source: &str,
2564    requested_name: Option<&str>,
2565) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
2566    let query = select_named_query_decl(query_source, requested_name)?;
2567    Ok((query.name, query.params))
2568}
2569
2570fn query_params_from_json(
2571    query_params: &[omnigraph_compiler::query::ast::Param],
2572    params_json: Option<&Value>,
2573) -> Result<ParamMap> {
2574    json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
2575        .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
2576}
2577
2578fn normalize_bearer_token(value: Option<String>) -> Option<String> {
2579    value
2580        .map(|value| value.trim().to_string())
2581        .filter(|value| !value.is_empty())
2582}
2583
2584fn normalize_bearer_actor(value: String) -> Result<String> {
2585    let value = value.trim().to_string();
2586    if value.is_empty() {
2587        bail!("bearer token actor names must not be blank");
2588    }
2589    Ok(value)
2590}
2591
2592fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
2593    let entries: HashMap<String, String> = serde_json::from_str(value)
2594        .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
2595    Ok(entries.into_iter().collect())
2596}
2597
2598fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
2599    let contents = fs::read_to_string(path)
2600        .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
2601    parse_bearer_tokens_json(&contents)
2602        .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
2603}
2604
2605fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
2606    let mut seen_actors = HashSet::new();
2607    let mut seen_tokens = HashSet::new();
2608    let mut normalized = Vec::with_capacity(entries.len());
2609
2610    for (actor, token) in entries {
2611        let actor = normalize_bearer_actor(actor)?;
2612        let Some(token) = normalize_bearer_token(Some(token)) else {
2613            bail!("bearer token for actor '{actor}' must not be blank");
2614        };
2615        if !seen_actors.insert(actor.clone()) {
2616            bail!("duplicate bearer token actor '{actor}'");
2617        }
2618        if !seen_tokens.insert(token.clone()) {
2619            bail!("duplicate bearer token value configured");
2620        }
2621        normalized.push((actor, token));
2622    }
2623
2624    normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
2625    Ok(normalized)
2626}
2627
2628fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
2629    let mut entries = Vec::new();
2630
2631    if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
2632    {
2633        entries.push(("default".to_string(), token));
2634    }
2635
2636    if let Some(path) =
2637        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
2638    {
2639        entries.extend(read_bearer_tokens_file(&path)?);
2640    } else if let Some(json) =
2641        normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
2642    {
2643        entries.extend(parse_bearer_tokens_json(&json)?);
2644    }
2645
2646    validate_bearer_tokens(entries)
2647}
2648
2649#[cfg(test)]
2650mod tests {
2651    use super::{
2652        GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
2653        classify_server_runtime_state, hash_bearer_token, load_server_settings,
2654        normalize_bearer_token, parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
2655    };
2656    use serial_test::serial;
2657    use std::env;
2658    use std::fs;
2659    use tempfile::tempdir;
2660
2661    #[test]
2662    fn hash_bearer_token_produces_32_byte_output() {
2663        let hash = hash_bearer_token("any-token");
2664        assert_eq!(hash.len(), 32);
2665    }
2666
2667    #[test]
2668    fn hash_bearer_token_is_deterministic() {
2669        assert_eq!(
2670            hash_bearer_token("stable-input"),
2671            hash_bearer_token("stable-input"),
2672        );
2673    }
2674
2675    #[test]
2676    fn hash_bearer_token_differs_for_different_inputs() {
2677        assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
2678    }
2679
2680    #[test]
2681    fn hash_bearer_token_matches_known_sha256_vector() {
2682        // SHA-256("abc"). If this ever fails, the hash function was swapped.
2683        let hash = hash_bearer_token("abc");
2684        let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
2685        assert_eq!(
2686            hex,
2687            "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
2688        );
2689    }
2690
2691    #[test]
2692    fn server_settings_load_from_yaml_config() {
2693        let temp = tempdir().unwrap();
2694        let config = temp.path().join("omnigraph.yaml");
2695        fs::write(
2696            &config,
2697            r#"
2698graphs:
2699  local:
2700    uri: /tmp/demo.omni
2701server:
2702  graph: local
2703  bind: 0.0.0.0:9090
2704"#,
2705        )
2706        .unwrap();
2707
2708        let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
2709        match &settings.mode {
2710            ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/demo.omni"),
2711            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2712        }
2713        assert_eq!(settings.bind, "0.0.0.0:9090");
2714    }
2715
2716    #[test]
2717    fn server_settings_cli_flags_override_yaml_config() {
2718        let temp = tempdir().unwrap();
2719        let config = temp.path().join("omnigraph.yaml");
2720        fs::write(
2721            &config,
2722            r#"
2723graphs:
2724  local:
2725    uri: /tmp/demo.omni
2726server:
2727  graph: local
2728  bind: 127.0.0.1:8080
2729"#,
2730        )
2731        .unwrap();
2732
2733        let settings = load_server_settings(
2734            Some(&config),
2735            Some("/tmp/override.omni".to_string()),
2736            None,
2737            Some("0.0.0.0:9999".to_string()),
2738            false,
2739        )
2740        .unwrap();
2741        match &settings.mode {
2742            ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/override.omni"),
2743            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2744        }
2745        assert_eq!(settings.bind, "0.0.0.0:9999");
2746    }
2747
2748    #[test]
2749    fn server_settings_can_resolve_named_target() {
2750        let temp = tempdir().unwrap();
2751        let config = temp.path().join("omnigraph.yaml");
2752        fs::write(
2753            &config,
2754            r#"
2755graphs:
2756  local:
2757    uri: ./demo.omni
2758  dev:
2759    uri: http://127.0.0.1:8080
2760server:
2761  graph: local
2762  bind: 127.0.0.1:8080
2763"#,
2764        )
2765        .unwrap();
2766
2767        let settings =
2768            load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
2769                .unwrap();
2770        match &settings.mode {
2771            ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "http://127.0.0.1:8080"),
2772            ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2773        }
2774    }
2775
2776    #[test]
2777    fn server_settings_require_uri_from_cli_or_config() {
2778        let error = load_server_settings(None, None, None, None, false).unwrap_err();
2779        assert!(
2780            error.to_string().contains("no graph to serve"),
2781            "expected mode-inference error, got: {error}",
2782        );
2783    }
2784
2785    #[test]
2786    fn classify_open_requires_explicit_unauthenticated_flag() {
2787        // State 1: no tokens, no policy, no flag → refuse to start.
2788        let error = classify_server_runtime_state(false, false, false).unwrap_err();
2789        let msg = error.to_string();
2790        assert!(
2791            msg.contains("--unauthenticated"),
2792            "expected refusal message mentioning --unauthenticated, got: {msg}"
2793        );
2794
2795        // Same matrix cell but with the flag set → Open mode permitted.
2796        assert_eq!(
2797            classify_server_runtime_state(false, false, true).unwrap(),
2798            ServerRuntimeState::Open
2799        );
2800    }
2801
2802    #[test]
2803    fn classify_tokens_without_policy_is_default_deny() {
2804        // State 2: tokens configured, no policy → DefaultDeny regardless
2805        // of the flag (the flag opts into the fully-open dev mode; it
2806        // doesn't downgrade default-deny back to open).
2807        assert_eq!(
2808            classify_server_runtime_state(true, false, false).unwrap(),
2809            ServerRuntimeState::DefaultDeny
2810        );
2811        assert_eq!(
2812            classify_server_runtime_state(true, false, true).unwrap(),
2813            ServerRuntimeState::DefaultDeny
2814        );
2815    }
2816
2817    #[tokio::test]
2818    #[serial]
2819    async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
2820        // Bug 2 from the bot-review pass: multi-mode startup was missing
2821        // the "policy requires tokens" check that single-mode enforces.
2822        // After centralizing the check in `classify_server_runtime_state`,
2823        // both modes get the same enforcement. This test guards the
2824        // multi-mode propagation path.
2825        //
2826        // Sibling test below pins single mode. Together they pin that
2827        // the classifier is called from both branches of `serve()`.
2828        let _guard = EnvGuard::set(&[
2829            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
2830            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
2831            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
2832            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
2833            ("OMNIGRAPH_UNAUTHENTICATED", None),
2834        ]);
2835        let temp = tempdir().unwrap();
2836        // The classifier reads `has_policy_configured` from the config
2837        // shape (does the Option contain a path?), not from file
2838        // existence, so we can hand it a path without writing a real
2839        // policy file — the bail fires before policy load.
2840        let policy_path = temp.path().join("server-policy.yaml");
2841        let config = ServerConfig {
2842            mode: ServerConfigMode::Multi {
2843                graphs: vec![GraphStartupConfig {
2844                    graph_id: "alpha".to_string(),
2845                    uri: temp
2846                        .path()
2847                        .join("alpha.omni")
2848                        .to_string_lossy()
2849                        .into_owned(),
2850                    policy_file: None,
2851                }],
2852                config_path: temp.path().join("omnigraph.yaml"),
2853                server_policy_file: Some(policy_path),
2854            },
2855            bind: "127.0.0.1:0".to_string(),
2856            allow_unauthenticated: false,
2857        };
2858        let result = serve(config).await;
2859        let err = result
2860            .expect_err("serve should refuse to start in multi mode with policy but no tokens");
2861        let msg = format!("{:?}", err);
2862        assert!(
2863            msg.contains("policy file is configured but no bearer tokens"),
2864            "expected policy-without-tokens rejection in multi mode, got: {msg}",
2865        );
2866    }
2867
2868    #[tokio::test]
2869    #[serial]
2870    async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
2871        // MR-723 PR A: pin the integration boundary that the classifier
2872        // is actually called by `serve()` before any side-effecting
2873        // work (Lance dataset open, TcpListener::bind). The classifier
2874        // itself is unit-tested above; this test guards the propagation
2875        // path from `classify_server_runtime_state` through serve's
2876        // `?` so a future refactor that drops the call returns red.
2877        //
2878        // Marked `#[serial]` because we have to clear all bearer-token
2879        // env vars, and another test in this module setting any of them
2880        // concurrently would corrupt the read inside `resolve_token_source`.
2881        let _guard = EnvGuard::set(&[
2882            ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
2883            ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
2884            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
2885            ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
2886            ("OMNIGRAPH_UNAUTHENTICATED", None),
2887        ]);
2888        let temp = tempdir().unwrap();
2889        // Graph path doesn't need to exist — classifier fires before
2890        // `AppState::open_with_bearer_tokens_and_policy`.
2891        let config = ServerConfig {
2892            mode: ServerConfigMode::Single {
2893                uri: temp
2894                    .path()
2895                    .join("graph.omni")
2896                    .to_string_lossy()
2897                    .into_owned(),
2898                policy_file: None,
2899            },
2900            bind: "127.0.0.1:0".to_string(),
2901            allow_unauthenticated: false,
2902        };
2903        let result = serve(config).await;
2904        let err =
2905            result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
2906        let msg = format!("{:?}", err);
2907        assert!(
2908            msg.contains("no bearer tokens") || msg.contains("policy file"),
2909            "expected refusal message naming the misconfiguration, got: {msg}",
2910        );
2911    }
2912
2913    #[test]
2914    #[serial]
2915    fn unauthenticated_env_var_classification() {
2916        // MR-723 PR A: closes the gap where the env-var read path inside
2917        // `load_server_settings` was structurally implemented but not
2918        // exercised by any test. Three properties to pin, all in one
2919        // sequential test because `cargo test` runs the mod test suite
2920        // in parallel and `OMNIGRAPH_UNAUTHENTICATED` is process-global
2921        // — interleaving with another test that sets the same env var
2922        // (concurrent classifier tests, even the bearer-token suite
2923        // sharing `EnvGuard`) corrupts the read. Sequential within one
2924        // test fn is the simplest race-free shape.
2925        let temp = tempdir().unwrap();
2926        let config_path = temp.path().join("omnigraph.yaml");
2927        fs::write(
2928            &config_path,
2929            r#"
2930graphs:
2931  local:
2932    uri: /tmp/demo-unauth.omni
2933server:
2934  graph: local
2935"#,
2936        )
2937        .unwrap();
2938
2939        // Truthy values flip Open mode on, even with CLI flag off.
2940        for value in ["1", "true", "yes", "TRUE", "anything"] {
2941            let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
2942            let settings = load_server_settings(Some(&config_path), None, None, None, false)
2943                .expect("settings load should succeed");
2944            assert!(
2945                settings.allow_unauthenticated,
2946                "OMNIGRAPH_UNAUTHENTICATED={value:?} should enable Open mode",
2947            );
2948        }
2949
2950        // Falsy values keep refusal behavior, even with CLI flag off.
2951        for value in ["0", "false", "FALSE", ""] {
2952            let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
2953            let settings = load_server_settings(Some(&config_path), None, None, None, false)
2954                .expect("settings load should succeed");
2955            assert!(
2956                !settings.allow_unauthenticated,
2957                "OMNIGRAPH_UNAUTHENTICATED={value:?} should NOT enable Open mode",
2958            );
2959        }
2960
2961        // Unset env var: also false.
2962        let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
2963        let settings = load_server_settings(Some(&config_path), None, None, None, false)
2964            .expect("settings load should succeed");
2965        assert!(
2966            !settings.allow_unauthenticated,
2967            "OMNIGRAPH_UNAUTHENTICATED unset should NOT enable Open mode",
2968        );
2969        drop(_guard);
2970
2971        // CLI flag wins even when env is falsy — `serve()` honors the
2972        // OR of both inputs.
2973        let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
2974        let settings = load_server_settings(Some(&config_path), None, None, None, true)
2975            .expect("settings load should succeed");
2976        assert!(
2977            settings.allow_unauthenticated,
2978            "--unauthenticated CLI flag should win even when env is falsy",
2979        );
2980    }
2981
2982    #[test]
2983    fn classify_policy_enabled_requires_tokens() {
2984        // State 3: tokens + policy → PolicyEnabled, regardless of the
2985        // `allow_unauthenticated` flag (Cedar evaluates the bearer,
2986        // the flag is moot once tokens exist).
2987        assert_eq!(
2988            classify_server_runtime_state(true, true, false).unwrap(),
2989            ServerRuntimeState::PolicyEnabled
2990        );
2991        assert_eq!(
2992            classify_server_runtime_state(true, true, true).unwrap(),
2993            ServerRuntimeState::PolicyEnabled
2994        );
2995    }
2996
2997    #[test]
2998    fn classify_policy_without_tokens_is_rejected() {
2999        // Closes the "policy installed but no tokens → silent 401 on
3000        // every request" footgun. The same shape that single-mode
3001        // `open_with_bearer_tokens_and_policy` used to bail on
3002        // privately is now rejected by the classifier so both single
3003        // and multi mode get the same enforcement from one source of
3004        // truth.
3005        for allow_unauthenticated in [false, true] {
3006            let err =
3007                classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
3008            let msg = err.to_string();
3009            assert!(
3010                msg.contains("policy file is configured but no bearer tokens"),
3011                "expected policy-without-tokens rejection message; got: {msg}"
3012            );
3013            assert!(
3014                msg.contains("every request would 401"),
3015                "rejection message must name the failure mode; got: {msg}"
3016            );
3017        }
3018    }
3019
3020    #[test]
3021    fn normalize_bearer_token_trims_and_filters_blank_values() {
3022        assert_eq!(normalize_bearer_token(None), None);
3023        assert_eq!(normalize_bearer_token(Some("   ".to_string())), None);
3024        assert_eq!(
3025            normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
3026            Some("demo-token")
3027        );
3028    }
3029
3030    struct EnvGuard {
3031        saved: Vec<(&'static str, Option<String>)>,
3032    }
3033
3034    impl EnvGuard {
3035        fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
3036            let saved = vars
3037                .iter()
3038                .map(|(name, _)| (*name, env::var(name).ok()))
3039                .collect::<Vec<_>>();
3040            for (name, value) in vars {
3041                unsafe {
3042                    match value {
3043                        Some(value) => env::set_var(name, value),
3044                        None => env::remove_var(name),
3045                    }
3046                }
3047            }
3048            Self { saved }
3049        }
3050    }
3051
3052    impl Drop for EnvGuard {
3053        fn drop(&mut self) {
3054            for (name, value) in self.saved.drain(..) {
3055                unsafe {
3056                    match value {
3057                        Some(value) => env::set_var(name, value),
3058                        None => env::remove_var(name),
3059                    }
3060                }
3061            }
3062        }
3063    }
3064
3065    #[test]
3066    fn parse_bearer_tokens_json_reads_actor_token_map() {
3067        let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
3068        assert_eq!(tokens.len(), 2);
3069        assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
3070        assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
3071    }
3072
3073    #[test]
3074    #[serial]
3075    fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
3076        let temp = tempdir().unwrap();
3077        let tokens_path = temp.path().join("tokens.json");
3078        fs::write(
3079            &tokens_path,
3080            r#"{"team-01":"token-one","team-02":"token-two"}"#,
3081        )
3082        .unwrap();
3083
3084        let _guard = EnvGuard::set(&[
3085            ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
3086            (
3087                "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
3088                Some(tokens_path.to_str().unwrap()),
3089            ),
3090            ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3091        ]);
3092
3093        let tokens = server_bearer_tokens_from_env().unwrap();
3094        assert_eq!(
3095            tokens,
3096            vec![
3097                ("default".to_string(), "legacy-token".to_string()),
3098                ("team-01".to_string(), "token-one".to_string()),
3099                ("team-02".to_string(), "token-two".to_string()),
3100            ]
3101        );
3102    }
3103}