Skip to main content

omnigraph_server/
lib.rs

1pub mod api;
2mod handlers;
3mod settings;
4use handlers::*;
5use settings::*;
6pub use settings::{ServerRuntimeState, classify_server_runtime_state, load_server_settings};
7pub mod auth;
8pub mod graph_id;
9pub mod identity;
10pub mod policy;
11pub mod queries;
12pub mod registry;
13pub mod workload;
14
15pub use graph_id::GraphId;
16pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
17pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
18
19use crate::queries::{QueryRegistry, check, format_check_breakages};
20
21use std::collections::{BTreeMap, HashMap, HashSet};
22use std::fs;
23use std::io;
24use std::io::Write;
25use std::path::PathBuf;
26use std::sync::Arc;
27
28use api::{
29    BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
30    BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
31    CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
32    HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse,
33    QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput,
34    SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output,
35    snapshot_payload,
36};
37pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
38use axum::body::{Body, Bytes};
39use axum::extract::DefaultBodyLimit;
40use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
41use axum::http::StatusCode;
42use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
43use axum::middleware::{self, Next};
44use axum::response::{IntoResponse, Response};
45use axum::routing::{delete, get, post};
46use axum::{Json, Router};
47use color_eyre::eyre::{Result, WrapErr, bail, eyre};
48use futures::stream;
49use omnigraph::db::{Omnigraph, ReadTarget};
50use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
51use omnigraph::storage::normalize_root_uri;
52use omnigraph_compiler::catalog::Catalog;
53use omnigraph_compiler::json_params_to_param_map;
54use omnigraph_compiler::query::parser::parse_query;
55use omnigraph_compiler::{JsonParamMode, ParamMap};
56pub use policy::{
57    PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
58    PolicyRequest, PolicyResourceKind, PolicyTestConfig,
59};
60use serde::Deserialize;
61use serde_json::Value;
62use sha2::{Digest, Sha256};
63use subtle::ConstantTimeEq;
64use tokio::net::TcpListener;
65use tokio::sync::mpsc;
66use tower_http::trace::TraceLayer;
67use tracing::{error, info, warn};
68use tracing_subscriber::EnvFilter;
69use utoipa::OpenApi;
70use utoipa::openapi::path::{Parameter, ParameterIn};
71use utoipa::openapi::schema::{Object, Type};
72use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
73
74type BearerTokenHash = [u8; 32];
75
76fn hash_bearer_token(token: &str) -> BearerTokenHash {
77    let digest = Sha256::digest(token.as_bytes());
78    let mut out = [0u8; 32];
79    out.copy_from_slice(&digest);
80    out
81}
82
83#[derive(OpenApi)]
84#[openapi(
85    info(
86        title = "Omnigraph API",
87        description = "HTTP API for the Omnigraph graph database",
88    ),
89    paths(
90        handlers::server_health,
91        handlers::server_graphs_list,
92        handlers::server_snapshot,
93        // deprecated; the #[deprecated] attribute on the handler
94        // surfaces as `deprecated: true` on the OpenAPI operation.
95        #[allow(deprecated)] handlers::server_read,
96        handlers::server_query,
97        handlers::server_export,
98        #[allow(deprecated)] handlers::server_change,
99        handlers::server_mutate,
100        handlers::server_list_queries,
101        handlers::server_invoke_query,
102        handlers::server_schema_apply,
103        handlers::server_schema_get,
104        handlers::server_load,
105        // deprecated; the #[deprecated] attribute on the handler surfaces as
106        // `deprecated: true` on the OpenAPI operation.
107        #[allow(deprecated)] handlers::server_ingest,
108        handlers::server_branch_list,
109        handlers::server_branch_create,
110        handlers::server_branch_delete,
111        handlers::server_branch_merge,
112        handlers::server_commit_list,
113        handlers::server_commit_show,
114    ),
115    modifiers(&SecurityAddon),
116)]
117pub struct ApiDoc;
118
119/// The canonical served OpenAPI shape (RFC-011 cluster-only): the static
120/// `ApiDoc` with every protected path nested under `/graphs/{graph_id}/…`
121/// and `cluster_`-prefixed operation ids. `/healthz` and `/graphs` stay
122/// flat. This is the single source of nesting — both the runtime
123/// `server_openapi` handler and the committed `openapi.json` derive from
124/// it, so the published spec can never describe routes the server does
125/// not serve. The handler additionally strips security in open mode; the
126/// committed spec retains it.
127pub fn served_openapi() -> utoipa::openapi::OpenApi {
128    let mut doc = ApiDoc::openapi();
129    handlers::nest_paths_under_cluster_prefix(&mut doc);
130    doc
131}
132
133struct SecurityAddon;
134
135impl utoipa::Modify for SecurityAddon {
136    fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
137        openapi
138            .components
139            .get_or_insert_with(Default::default)
140            .add_security_scheme(
141                "bearer_token",
142                SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
143            );
144    }
145}
146
147const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
148const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
149const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
150const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
151
152#[derive(Debug, Clone)]
153pub struct ServerConfig {
154    /// Server topology + the graphs to open at startup. RFC-011
155    /// cluster-only: the server always boots from a cluster
156    /// (`--cluster <dir | s3://…>`) and serves N graphs under cluster
157    /// routes.
158    pub mode: ServerConfigMode,
159    pub bind: String,
160    /// Operator opt-in for fully-unauthenticated dev mode (MR-723).
161    /// When neither bearer tokens nor a policy file are configured,
162    /// `serve()` refuses to start unless this is true (set via
163    /// `--unauthenticated` or `OMNIGRAPH_UNAUTHENTICATED=1`). The
164    /// motivation is that "no tokens + no policy" looks like protection
165    /// (no Cedar errors at boot) but is actually fully open — operators
166    /// who set up auth and forgot the policy file would otherwise ship
167    /// the illusion of protection.
168    pub allow_unauthenticated: bool,
169    /// Operator opt-in for fail-fast cluster boot. By default, graph-local
170    /// startup failures quarantine that graph and healthy graphs still serve.
171    /// When true, any quarantined or failed graph aborts startup.
172    pub require_all_graphs: bool,
173}
174
175/// What `load_server_settings` produces. RFC-011 cluster-only: the
176/// server always boots from a cluster's applied revision into a
177/// multi-graph deployment (N ≥ 1 graphs).
178#[derive(Debug, Clone)]
179pub enum ServerConfigMode {
180    /// Cluster boot — `--cluster <dir | s3://…>` resolves the applied
181    /// revision into per-graph startup configs plus an optional
182    /// server-level policy.
183    Multi {
184        /// Per-graph startup configs, sorted by graph id (BTreeMap
185        /// iteration order). The parallel-open loop iterates this.
186        graphs: Vec<GraphStartupConfig>,
187        /// The cluster boot source (config directory or storage root).
188        /// Kept on the mode so future runtime mutation (deferred — see
189        /// release notes) can locate the source of truth without
190        /// re-parsing CLI args.
191        config_path: PathBuf,
192        /// Server-level Cedar policy for the management endpoints
193        /// (`GET /graphs`). Wired into `GET /graphs` authorization.
194        server_policy: Option<PolicySource>,
195    },
196}
197
198/// Where a Cedar policy bundle comes from at startup. Cluster-local files are
199/// used during config application; inline digest-verified catalog content is
200/// used for serving, where the catalog may live on object storage and the
201/// server must not re-read mutable state after the snapshot.
202#[derive(Debug, Clone)]
203pub enum PolicySource {
204    File(PathBuf),
205    Inline(String),
206}
207
208/// One graph's startup-time configuration: id, opened URI, optional
209/// per-graph policy source. Constructed by `load_server_settings`
210/// in multi mode; consumed by `serve`'s parallel open loop.
211#[derive(Debug, Clone)]
212pub struct GraphStartupConfig {
213    pub graph_id: String,
214    pub uri: String,
215    pub policy: Option<PolicySource>,
216    /// Pre-resolved embedding config from an applied cluster provider profile.
217    /// Legacy config paths leave this unset and continue to use env resolution.
218    pub embedding: Option<omnigraph::embedding::EmbeddingConfig>,
219    /// Per-graph stored-query registry, loaded and identity-checked at
220    /// settings-build time; type-checked against the schema when this
221    /// graph's engine opens.
222    pub queries: QueryRegistry,
223}
224
225/// Runtime routing for the server (RFC-011 cluster-only). Every
226/// deployment serves cluster routes (`/graphs/{graph_id}/...`) backed by
227/// a registry of N graphs (N ≥ 1). The single-graph convenience
228/// constructors build a one-graph registry keyed by `default`; the
229/// cluster boot path builds an N-graph registry. There is no longer a
230/// flat-route mode.
231///
232/// `config_path` is the boot source (the cluster directory or storage
233/// root); preserved here so future runtime mutation (deferred) can find
234/// the source of truth without re-parsing CLI args. The server treats
235/// the source as operator-owned and never writes it.
236///
237/// All handler bodies are mode-agnostic — the routing middleware
238/// (`resolve_graph_handle`) injects `Arc<GraphHandle>` as a request
239/// extension by looking up the `{graph_id}` URL segment in the registry.
240#[derive(Clone)]
241pub struct GraphRouting {
242    pub registry: Arc<GraphRegistry>,
243    pub config_path: Option<PathBuf>,
244}
245
246#[derive(Clone)]
247pub struct AppState {
248    /// Runtime routing — the single source of truth for where each
249    /// request's graph lives. Single mode holds the handle directly;
250    /// multi mode holds the registry + config path. Both arms are
251    /// the same shape from a handler's perspective: middleware
252    /// extracts an `Arc<GraphHandle>` and injects it as a request
253    /// extension.
254    routing: GraphRouting,
255    /// Per-actor admission control. Process-wide (not per-graph) —
256    /// see MR-668 decision Q6.
257    workload: Arc<workload::WorkloadController>,
258    bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
259    /// Server-level Cedar policy. Used by management endpoints (`GET
260    /// /graphs`) which act on the registry resource, not on a per-graph
261    /// resource. Loaded from the cluster-scoped policy binding when
262    /// configured. Per-graph policies live on each `GraphHandle.policy`.
263    server_policy: Option<Arc<PolicyEngine>>,
264}
265
266struct ExportStreamWriter {
267    sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
268}
269
270impl Write for ExportStreamWriter {
271    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
272        self.sender
273            .send(Ok(Bytes::copy_from_slice(buf)))
274            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
275        Ok(buf.len())
276    }
277
278    fn flush(&mut self) -> io::Result<()> {
279        Ok(())
280    }
281}
282
283#[derive(Debug)]
284pub struct ApiError {
285    status: StatusCode,
286    code: ErrorCode,
287    message: String,
288    merge_conflicts: Vec<api::MergeConflictOutput>,
289    manifest_conflict: Option<api::ManifestConflictOutput>,
290}
291
292impl AppState {
293    /// Canonical single-mode constructor. Every other `new_*` / `open_*`
294    /// helper is a thin convenience wrapper around this one. Builds the
295    /// engine + per-graph policy through `build_single_mode`, which
296    /// applies `Omnigraph::with_policy` so HTTP-layer and engine-layer
297    /// policy can never diverge — there is no "policy installed on HTTP
298    /// but not on engine" representable state (closes the prior
299    /// `with_policy_engine` footgun that reused the engine `Arc`
300    /// without re-applying `with_policy`).
301    pub fn new_single(
302        uri: String,
303        db: Omnigraph,
304        bearer_tokens: Vec<(String, String)>,
305        policy_engine: Option<PolicyEngine>,
306        workload: workload::WorkloadController,
307    ) -> Self {
308        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
309        let per_graph_policy = policy_engine.map(Arc::new);
310        Self::build_single_mode(
311            uri,
312            db,
313            bearer_tokens,
314            per_graph_policy,
315            Arc::new(workload),
316            None,
317        )
318    }
319
320    /// Like `new_single`, but attaches a pre-validated stored-query
321    /// registry. Private — the production single-mode boot path
322    /// (`open_single_with_queries`) is the only caller; every public
323    /// `new_*` constructor builds with no stored queries.
324    fn new_single_with_queries(
325        uri: String,
326        db: Omnigraph,
327        bearer_tokens: Vec<(String, String)>,
328        policy_engine: Option<PolicyEngine>,
329        workload: workload::WorkloadController,
330        queries: Option<Arc<QueryRegistry>>,
331    ) -> Self {
332        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
333        let per_graph_policy = policy_engine.map(Arc::new);
334        Self::build_single_mode(
335            uri,
336            db,
337            bearer_tokens,
338            per_graph_policy,
339            Arc::new(workload),
340            queries,
341        )
342    }
343
344    pub fn new(uri: String, db: Omnigraph) -> Self {
345        Self::new_single(
346            uri,
347            db,
348            Vec::new(),
349            None,
350            workload::WorkloadController::from_env(),
351        )
352    }
353
354    pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
355        let bearer_tokens = normalize_bearer_token(bearer_token)
356            .into_iter()
357            .map(|token| ("default".to_string(), token))
358            .collect();
359        Self::new_with_bearer_tokens(uri, db, bearer_tokens)
360    }
361
362    pub fn new_with_bearer_tokens(
363        uri: String,
364        db: Omnigraph,
365        bearer_tokens: Vec<(String, String)>,
366    ) -> Self {
367        Self::new_single(
368            uri,
369            db,
370            bearer_tokens,
371            None,
372            workload::WorkloadController::from_env(),
373        )
374    }
375
376    pub fn new_with_bearer_tokens_and_policy(
377        uri: String,
378        db: Omnigraph,
379        bearer_tokens: Vec<(String, String)>,
380        policy_engine: Option<PolicyEngine>,
381    ) -> Self {
382        Self::new_single(
383            uri,
384            db,
385            bearer_tokens,
386            policy_engine,
387            workload::WorkloadController::from_env(),
388        )
389    }
390
391    /// Construct with a caller-provided [`workload::WorkloadController`].
392    /// Tests and benches use this to override per-actor caps without
393    /// mutating global env vars (unsafe in Rust 2024 once the async
394    /// runtime is up — `setenv` isn't thread-safe). For tests that also
395    /// need a custom `PolicyEngine`, use [`new_single`] directly.
396    pub fn new_with_workload(
397        uri: String,
398        db: Omnigraph,
399        bearer_tokens: Vec<(String, String)>,
400        workload: workload::WorkloadController,
401    ) -> Self {
402        Self::new_single(uri, db, bearer_tokens, None, workload)
403    }
404
405    pub async fn open(uri: impl Into<String>) -> Result<Self> {
406        Self::open_with_bearer_token(uri, None).await
407    }
408
409    pub async fn open_with_bearer_token(
410        uri: impl Into<String>,
411        bearer_token: Option<String>,
412    ) -> Result<Self> {
413        let bearer_tokens = normalize_bearer_token(bearer_token)
414            .into_iter()
415            .map(|token| ("default".to_string(), token))
416            .collect();
417        Self::open_with_bearer_tokens(uri, bearer_tokens).await
418    }
419
420    pub async fn open_with_bearer_tokens(
421        uri: impl Into<String>,
422        bearer_tokens: Vec<(String, String)>,
423    ) -> Result<Self> {
424        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
425        let db = Omnigraph::open(&uri).await?;
426        Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
427    }
428
429    pub async fn open_with_bearer_tokens_and_policy(
430        uri: impl Into<String>,
431        bearer_tokens: Vec<(String, String)>,
432        policy_file: Option<&PathBuf>,
433    ) -> Result<Self> {
434        Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default())
435            .await
436    }
437
438    /// Single-mode boot with a stored-query registry: open the engine,
439    /// **type-check the registry against the live schema and refuse to
440    /// start on a breakage** (same posture as bad policy YAML), log
441    /// non-blocking warnings, then attach the registry to the handle.
442    /// With an empty registry the check is a no-op and no registry is
443    /// attached — that is the path `open_with_bearer_tokens_and_policy`
444    /// (no stored queries) takes.
445    pub async fn open_single_with_queries(
446        uri: impl Into<String>,
447        bearer_tokens: Vec<(String, String)>,
448        policy_file: Option<&PathBuf>,
449        queries: QueryRegistry,
450    ) -> Result<Self> {
451        Self::open_single_with_queries_for_graph_id(uri, bearer_tokens, policy_file, queries, None)
452            .await
453    }
454
455    async fn open_single_with_queries_for_graph_id(
456        uri: impl Into<String>,
457        bearer_tokens: Vec<(String, String)>,
458        policy_file: Option<&PathBuf>,
459        queries: QueryRegistry,
460        graph_id: Option<String>,
461    ) -> Result<Self> {
462        // The "policy requires tokens" invariant is enforced once by
463        // `classify_server_runtime_state` in `serve()`, before either
464        // single-mode or multi-mode construction is reached. By the
465        // time we get here, the (policy, no-tokens) combination has
466        // already been rejected — no second bail needed.
467        let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
468        let graph_id = graph_id.unwrap_or_else(|| uri.clone());
469        let db = Omnigraph::open(&uri).await?;
470
471        // Validate the registry against the live schema and resolve it to
472        // an attachable handle (refuse boot on breakage).
473        let registry = validate_and_attach(queries, &db.catalog(), &graph_id)?;
474
475        let policy_engine = match policy_file {
476            Some(path) => Some(PolicyEngine::load_graph(path, &graph_id)?),
477            None => None,
478        };
479        Ok(Self::new_single_with_queries(
480            uri,
481            db,
482            bearer_tokens,
483            policy_engine,
484            workload::WorkloadController::from_env(),
485            registry,
486        ))
487    }
488
489    /// Single-graph convenience construction (RFC-011 cluster-only):
490    /// wraps the bare engine + per-graph policy in a `GraphHandle` keyed
491    /// by `default`, then builds a one-graph registry so the deployment
492    /// serves the same `/graphs/{graph_id}/...` cluster routes as any
493    /// other. Per-graph policy enforcement on the engine (MR-722) is
494    /// re-applied via `Omnigraph::with_policy` so HTTP and engine layers
495    /// can never diverge.
496    fn build_single_mode(
497        uri: String,
498        db: Omnigraph,
499        bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
500        policy_engine: Option<Arc<PolicyEngine>>,
501        workload: Arc<workload::WorkloadController>,
502        queries: Option<Arc<QueryRegistry>>,
503    ) -> Self {
504        // Engine-layer policy gate (MR-722). With a per-graph policy
505        // installed, every `_as` writer on `Omnigraph` calls into the
506        // PolicyChecker. HTTP-layer `authorize_request` is the first
507        // gate; engine-layer is the redundant-but-correct backstop.
508        let db = if let Some(policy) = policy_engine.as_ref() {
509            let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
510            db.with_policy(checker)
511        } else {
512            db
513        };
514        // The convenience constructors address the single graph by the
515        // reserved id `default` — both the registry key and the URL
516        // segment (`/graphs/default/...`).
517        let uri = normalize_root_uri(&uri).unwrap_or(uri);
518        let graph_id = GraphId::try_from("default").expect("'default' is a valid GraphId");
519        let key = GraphKey::cluster(graph_id);
520        let handle = Arc::new(GraphHandle {
521            key,
522            uri,
523            engine: Arc::new(db),
524            policy: policy_engine,
525            queries,
526        });
527        let registry = Arc::new(
528            GraphRegistry::from_handles(vec![handle])
529                .expect("a single handle never collides on graph id"),
530        );
531        Self {
532            routing: GraphRouting {
533                registry,
534                config_path: None,
535            },
536            workload,
537            bearer_tokens,
538            server_policy: None,
539        }
540    }
541
542    /// Multi-mode constructor — used by the startup loop. Operators
543    /// reach this by invoking `omnigraph-server --cluster <dir|s3://...>`.
544    ///
545    /// Caller supplies the already-opened `GraphHandle`s and (optionally)
546    /// the path to the source cluster. `server_policy` is loaded from the
547    /// cluster-scoped policy binding if configured.
548    pub fn new_multi(
549        handles: Vec<Arc<GraphHandle>>,
550        bearer_tokens: Vec<(String, String)>,
551        server_policy: Option<PolicyEngine>,
552        workload: workload::WorkloadController,
553        config_path: Option<PathBuf>,
554    ) -> std::result::Result<Self, InsertError> {
555        let bearer_tokens = hash_bearer_tokens(bearer_tokens);
556        let registry = Arc::new(GraphRegistry::from_handles(handles)?);
557        Ok(Self {
558            routing: GraphRouting {
559                registry,
560                config_path,
561            },
562            workload: Arc::new(workload),
563            bearer_tokens,
564            server_policy: server_policy.map(Arc::new),
565        })
566    }
567
568    /// Runtime routing accessor. Handlers don't typically inspect this —
569    /// they extract `Arc<GraphHandle>` via the routing middleware — but
570    /// `server_graphs_list` reads the registry through it.
571    pub fn routing(&self) -> &GraphRouting {
572        &self.routing
573    }
574
575    fn requires_bearer_auth(&self) -> bool {
576        if !self.bearer_tokens.is_empty() {
577            return true;
578        }
579        if self.server_policy.is_some() {
580            return true;
581        }
582        // Any per-graph policy also requires auth — otherwise the
583        // policy gate would receive unauthenticated requests. Reading
584        // the cached `any_per_graph_policy` flag off the registry
585        // snapshot is O(1).
586        self.routing.registry.snapshot_ref().any_per_graph_policy
587    }
588
589    fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
590        // Hash the incoming token and compare against every stored digest in
591        // constant time. Iterate all entries unconditionally so total work —
592        // and therefore response timing — doesn't depend on which slot matches.
593        let provided_hash = hash_bearer_token(provided_token);
594        let mut matched: Option<Arc<str>> = None;
595        for (hash, actor) in self.bearer_tokens.iter() {
596            if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
597                matched = Some(Arc::clone(actor));
598            }
599        }
600        matched.map(ResolvedActor::cluster_static)
601    }
602}
603
604fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
605    let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
606        .into_iter()
607        .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
608        .collect();
609    Arc::from(tokens)
610}
611
612impl ApiError {
613    pub fn unauthorized(message: impl Into<String>) -> Self {
614        Self {
615            status: StatusCode::UNAUTHORIZED,
616            code: ErrorCode::Unauthorized,
617            message: message.into(),
618            merge_conflicts: Vec::new(),
619            manifest_conflict: None,
620        }
621    }
622
623    pub fn forbidden(message: impl Into<String>) -> Self {
624        Self {
625            status: StatusCode::FORBIDDEN,
626            code: ErrorCode::Forbidden,
627            message: message.into(),
628            merge_conflicts: Vec::new(),
629            manifest_conflict: None,
630        }
631    }
632
633    pub fn bad_request(message: impl Into<String>) -> Self {
634        Self {
635            status: StatusCode::BAD_REQUEST,
636            code: ErrorCode::BadRequest,
637            message: message.into(),
638            merge_conflicts: Vec::new(),
639            manifest_conflict: None,
640        }
641    }
642
643    pub fn not_found(message: impl Into<String>) -> Self {
644        Self {
645            status: StatusCode::NOT_FOUND,
646            code: ErrorCode::NotFound,
647            message: message.into(),
648            merge_conflicts: Vec::new(),
649            manifest_conflict: None,
650        }
651    }
652
653    /// HTTP 405 Method Not Allowed. Used when the route is mounted but
654    /// the active server mode doesn't serve it (`GET /graphs` in
655    /// single-graph mode returns this instead of 404 so clients can
656    /// distinguish "wrong context" from "no such resource").
657    pub fn method_not_allowed(message: impl Into<String>) -> Self {
658        Self {
659            status: StatusCode::METHOD_NOT_ALLOWED,
660            code: ErrorCode::MethodNotAllowed,
661            message: message.into(),
662            merge_conflicts: Vec::new(),
663            manifest_conflict: None,
664        }
665    }
666
667    pub fn conflict(message: impl Into<String>) -> Self {
668        Self {
669            status: StatusCode::CONFLICT,
670            code: ErrorCode::Conflict,
671            message: message.into(),
672            merge_conflicts: Vec::new(),
673            manifest_conflict: None,
674        }
675    }
676
677    pub fn internal(message: impl Into<String>) -> Self {
678        Self {
679            status: StatusCode::INTERNAL_SERVER_ERROR,
680            code: ErrorCode::Internal,
681            message: message.into(),
682            merge_conflicts: Vec::new(),
683            manifest_conflict: None,
684        }
685    }
686
687    /// HTTP 429 Too Many Requests — actor exceeded their per-actor
688    /// admission cap (count or byte budget). Clients should respect the
689    /// `Retry-After` header. Mapped from `RejectReason::InFlightCountExceeded`
690    /// and `RejectReason::ByteBudgetExceeded`.
691    pub fn too_many_requests(message: impl Into<String>) -> Self {
692        Self {
693            status: StatusCode::TOO_MANY_REQUESTS,
694            code: ErrorCode::TooManyRequests,
695            message: message.into(),
696            merge_conflicts: Vec::new(),
697            manifest_conflict: None,
698        }
699    }
700
701    /// Convert a `WorkloadController` rejection into the matching
702    /// `ApiError` variant.
703    pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
704        match reject {
705            workload::RejectReason::InFlightCountExceeded { .. }
706            | workload::RejectReason::ByteBudgetExceeded { .. } => {
707                Self::too_many_requests(reject.to_string())
708            }
709        }
710    }
711
712    fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
713        Self {
714            status: StatusCode::CONFLICT,
715            code: ErrorCode::Conflict,
716            message: summarize_merge_conflicts(&conflicts),
717            merge_conflicts: conflicts,
718            manifest_conflict: None,
719        }
720    }
721
722    fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
723        Self {
724            status: StatusCode::CONFLICT,
725            code: ErrorCode::Conflict,
726            message,
727            merge_conflicts: Vec::new(),
728            manifest_conflict: Some(details),
729        }
730    }
731
732    fn from_omni(err: OmniError) -> Self {
733        match err {
734            OmniError::Compiler(err) => Self::bad_request(err.to_string()),
735            OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
736            OmniError::Manifest(err) => match err.kind {
737                ManifestErrorKind::BadRequest => Self::bad_request(err.message),
738                ManifestErrorKind::NotFound => Self::not_found(err.message),
739                ManifestErrorKind::Conflict => match err.details {
740                    Some(ManifestConflictDetails::ExpectedVersionMismatch {
741                        table_key,
742                        expected,
743                        actual,
744                    }) => Self::manifest_version_conflict(
745                        err.message,
746                        api::ManifestConflictOutput {
747                            table_key,
748                            expected,
749                            actual,
750                        },
751                    ),
752                    _ => Self::conflict(err.message),
753                },
754                ManifestErrorKind::Internal => Self::internal(err.message),
755            },
756            OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
757                conflicts
758                    .iter()
759                    .map(api::MergeConflictOutput::from)
760                    .collect(),
761            ),
762            OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
763            OmniError::Io(err) => Self::internal(format!("io: {err}")),
764            // Engine-layer policy enforcement (MR-722). All denials and
765            // evaluation failures surface here as 403. The HTTP-layer
766            // `authorize_request` already distinguishes 401 (missing
767            // bearer) from 403 (policy denial), so by the time the
768            // engine gate fires, the bearer is valid — any failure from
769            // the engine is a policy outcome, not an auth one.
770            OmniError::Policy(message) => Self::forbidden(message),
771            // `Omnigraph::init` against an existing graph URI in strict
772            // mode. Not currently HTTP-reachable (POST /graphs was
773            // pulled), but mapping is wired so the variant has a
774            // single canonical translation when a future runtime
775            // create endpoint lands.
776            err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
777        }
778    }
779}
780
781fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
782    if conflicts.is_empty() {
783        return "merge conflicts".to_string();
784    }
785
786    let preview: Vec<String> = conflicts
787        .iter()
788        .take(3)
789        .map(|conflict| match conflict.row_id.as_deref() {
790            Some(row_id) => format!(
791                "{}:{} ({})",
792                conflict.table_key,
793                row_id,
794                conflict.kind.as_str()
795            ),
796            None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
797        })
798        .collect();
799
800    let suffix = if conflicts.len() > preview.len() {
801        format!("; and {} more", conflicts.len() - preview.len())
802    } else {
803        String::new()
804    };
805
806    format!("merge conflicts: {}{}", preview.join("; "), suffix)
807}
808
809/// Constant `Retry-After` value (seconds) emitted on 429 responses.
810const RETRY_AFTER_SECONDS: &str = "60";
811
812impl IntoResponse for ApiError {
813    fn into_response(self) -> Response {
814        let mut headers = axum::http::HeaderMap::new();
815        if matches!(self.code, ErrorCode::TooManyRequests) {
816            headers.insert(
817                axum::http::header::RETRY_AFTER,
818                axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
819            );
820        }
821        (
822            self.status,
823            headers,
824            Json(ErrorOutput {
825                error: self.message,
826                code: Some(self.code),
827                merge_conflicts: self.merge_conflicts,
828                manifest_conflict: self.manifest_conflict,
829            }),
830        )
831            .into_response()
832    }
833}
834
835pub fn init_tracing() {
836    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
837    let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
838}
839
840/// Log each non-blocking advisory from a registry check report.
841fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
842    for warning in &report.warnings {
843        warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
844    }
845}
846
847fn validate_registry_against_catalog(
848    registry: &QueryRegistry,
849    catalog: &Catalog,
850    label: &str,
851) -> omnigraph::error::Result<()> {
852    let report = check(registry, catalog);
853    if report.has_breakages() {
854        return Err(OmniError::manifest(format_check_breakages(label, &report)));
855    }
856    log_registry_warnings(label, &report);
857    Ok(())
858}
859
860/// Validate a loaded stored-query registry against the live schema and
861/// resolve it to an attachable handle. Refuses boot on any breakage
862/// (same posture as bad policy YAML), logs the non-blocking warnings,
863/// and collapses an empty registry to `None` (nothing attached). This is
864/// the single gate every open path funnels through, so no opener can
865/// attach a registry that has not been schema-checked. `label` names the
866/// graph in messages.
867fn validate_and_attach(
868    queries: QueryRegistry,
869    catalog: &Catalog,
870    label: &str,
871) -> Result<Option<Arc<QueryRegistry>>> {
872    validate_registry_against_catalog(&queries, catalog, label)
873        .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
874    Ok(if queries.is_empty() {
875        None
876    } else {
877        Some(Arc::new(queries))
878    })
879}
880
881pub fn build_app(state: AppState) -> Router {
882    // The per-graph protected routes, identical in single + multi mode.
883    // Two middleware layers wrap them (outer first, inner last):
884    //   1. `require_bearer_auth` — extracts the bearer token and injects
885    //      `ResolvedActor` (or rejects 401).
886    //   2. `resolve_graph_handle` — injects `Arc<GraphHandle>` based on
887    //      the active mode (single: the only handle; multi: lookup by
888    //      `{graph_id}` in the URI path).
889    let per_graph_protected = Router::new()
890        .route("/snapshot", get(server_snapshot))
891        .route("/export", post(server_export))
892        // /read and /change are kept indefinitely for back-compat;
893        // their handlers carry #[deprecated] so the OpenAPI operation is
894        // flagged and their responses include RFC 9745 Deprecation +
895        // RFC 8288 Link headers. Suppress the call-site warning for the
896        // route registration itself.
897        .route(
898            "/read",
899            post({
900                #[allow(deprecated)]
901                server_read
902            }),
903        )
904        .route("/query", post(server_query))
905        .route(
906            "/change",
907            post({
908                #[allow(deprecated)]
909                server_change
910            }),
911        )
912        .route("/mutate", post(server_mutate))
913        .route("/queries", get(server_list_queries))
914        .route("/queries/{name}", post(server_invoke_query))
915        .route("/schema", get(server_schema_get))
916        .route("/schema/apply", post(server_schema_apply))
917        .route(
918            "/load",
919            post(server_load).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
920        )
921        // /ingest is the deprecated alias of /load; its handler carries
922        // #[deprecated] (OpenAPI operation flagged) and emits RFC 9745
923        // Deprecation + RFC 8288 Link headers. Suppress the call-site warning.
924        .route(
925            "/ingest",
926            post({
927                #[allow(deprecated)]
928                server_ingest
929            })
930            .layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
931        )
932        .route(
933            "/branches",
934            get(server_branch_list).post(server_branch_create),
935        )
936        .route("/branches/{branch}", delete(server_branch_delete))
937        .route("/branches/merge", post(server_branch_merge))
938        .route("/commits", get(server_commit_list))
939        .route("/commits/{commit_id}", get(server_commit_show))
940        .route_layer(middleware::from_fn_with_state(
941            state.clone(),
942            resolve_graph_handle,
943        ))
944        .route_layer(middleware::from_fn_with_state(
945            state.clone(),
946            require_bearer_auth,
947        ));
948
949    // Management endpoints (`GET /graphs`) live alongside the per-graph
950    // router. They go through bearer auth but NOT through
951    // `resolve_graph_handle` — they operate on the registry directly.
952    //
953    // Runtime add/remove (`POST /graphs`, `DELETE /graphs/{id}`) is not
954    // exposed — operators run `cluster apply` and restart.
955    let management = Router::new()
956        .route("/graphs", get(server_graphs_list))
957        .route_layer(middleware::from_fn_with_state(
958            state.clone(),
959            require_bearer_auth,
960        ));
961
962    // RFC-011 cluster-only: per-graph routes always nest under
963    // `/graphs/{graph_id}/...`; there are no flat single-graph routes.
964    let protected: Router<AppState> = Router::new()
965        .nest("/graphs/{graph_id}", per_graph_protected)
966        .merge(management);
967
968    Router::new()
969        .route("/healthz", get(server_health))
970        .route("/openapi.json", get(server_openapi))
971        .merge(protected)
972        .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
973        .layer(TraceLayer::new_for_http())
974        .with_state(state)
975}
976
977pub async fn serve(config: ServerConfig) -> Result<()> {
978    let token_source = resolve_token_source().await?;
979    info!(source = token_source.name(), "loaded bearer token source");
980    let tokens = token_source.load().await?;
981
982    // For runtime-state classification, "any policy configured" means
983    // either the top-level/single-mode policy file OR a server-level
984    // policy OR any per-graph policy file. Mirrors the
985    // `requires_bearer_auth` semantics on AppState.
986    let has_policy_configured = match &config.mode {
987        ServerConfigMode::Multi {
988            graphs,
989            server_policy,
990            ..
991        } => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()),
992    };
993    let runtime_state = classify_server_runtime_state(
994        !tokens.is_empty(),
995        has_policy_configured,
996        config.allow_unauthenticated,
997    )?;
998    match runtime_state {
999        ServerRuntimeState::Open => warn!(
1000            "running with --unauthenticated: no bearer tokens, no policy file, all \
1001             requests permitted. This is for local dev only — do not expose to a \
1002             network you don't fully trust."
1003        ),
1004        ServerRuntimeState::DefaultDeny => warn!(
1005            "bearer tokens are configured but no policy file is set — running in \
1006             default-deny mode (only `read` actions are permitted for authenticated \
1007             actors). Configure a graph or cluster policy bundle in the cluster config, \
1008             run `omnigraph cluster apply`, and restart to enable Cedar rules."
1009        ),
1010        ServerRuntimeState::PolicyEnabled => {}
1011    }
1012
1013    let bind = config.bind.clone();
1014    let state = match config.mode {
1015        ServerConfigMode::Multi {
1016            graphs,
1017            config_path,
1018            server_policy,
1019        } => {
1020            info!(
1021                bind = %bind,
1022                mode = "cluster",
1023                graph_count = graphs.len(),
1024                config = %config_path.display(),
1025                "serving omnigraph"
1026            );
1027            open_multi_graph_state(
1028                graphs,
1029                tokens,
1030                server_policy.as_ref(),
1031                config_path,
1032                config.require_all_graphs,
1033            )
1034            .await?
1035        }
1036    };
1037
1038    let listener = TcpListener::bind(&bind).await?;
1039    axum::serve(listener, build_app(state))
1040        .with_graceful_shutdown(shutdown_signal())
1041        .await?;
1042    Ok(())
1043}
1044
1045/// Load a graph-scoped policy bundle from either source kind.
1046fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngine> {
1047    match source {
1048        PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?),
1049        PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?),
1050    }
1051}
1052
1053/// Parallel open of every graph in the startup config, with bounded
1054/// concurrency (`buffer_unordered(4)`). Graph-specific open failures
1055/// quarantine that graph; startup succeeds as long as at least one graph
1056/// opens.
1057///
1058/// The bound 4 is a rule-of-thumb for I/O-bound work. At N ≤ 10 this
1059/// trades startup latency for a small amount of concurrent S3 / Lance
1060/// open pressure.
1061pub async fn open_multi_graph_state(
1062    graphs: Vec<GraphStartupConfig>,
1063    tokens: Vec<(String, String)>,
1064    server_policy_source: Option<&PolicySource>,
1065    config_path: PathBuf,
1066    require_all_graphs: bool,
1067) -> Result<AppState> {
1068    use futures::StreamExt;
1069
1070    if graphs.is_empty() {
1071        bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1072    }
1073
1074    // Server-level policy (loaded once, applies to management endpoints).
1075    // The placeholder graph_id `"server"` is the sentinel the Cedar
1076    // resource-model refactor maps to the singleton
1077    // `Omnigraph::Server::"root"` entity at evaluation time.
1078    let server_policy = match server_policy_source {
1079        Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
1080        Some(PolicySource::Inline(source)) => Some(PolicyEngine::load_server_from_source(source)?),
1081        None => None,
1082    };
1083
1084    let configured_graphs = graphs.len();
1085    let results = futures::stream::iter(graphs.into_iter())
1086        .map(|cfg| async move {
1087            let graph_id = cfg.graph_id.clone();
1088            open_single_graph(cfg).await.map_err(|err| (graph_id, err))
1089        })
1090        .buffer_unordered(4)
1091        .collect::<Vec<_>>()
1092        .await;
1093    let mut handles = Vec::new();
1094    let mut failed = 0usize;
1095    for result in results {
1096        match result {
1097            Ok(handle) => handles.push(handle),
1098            Err((graph_id, err)) => {
1099                failed += 1;
1100                warn!(
1101                    graph_id = %graph_id,
1102                    error = %err,
1103                    "graph quarantined during startup"
1104                );
1105            }
1106        }
1107    }
1108    if require_all_graphs && failed > 0 {
1109        bail!(
1110            "strict multi-graph startup requires every graph to open ({} configured, {} failed)",
1111            configured_graphs,
1112            failed
1113        );
1114    }
1115    if handles.is_empty() {
1116        bail!(
1117            "no healthy graphs opened from multi-graph startup config ({} configured, {} failed)",
1118            configured_graphs,
1119            failed
1120        );
1121    }
1122
1123    let workload = workload::WorkloadController::from_env();
1124    let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1125        .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1126    Ok(state)
1127}
1128
1129/// Open one graph and wrap it in a `GraphHandle`. Used at startup by
1130/// `open_multi_graph_state`.
1131async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1132    let graph_id = GraphId::try_from(cfg.graph_id.clone())
1133        .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1134    let uri = normalize_root_uri(&cfg.uri)
1135        .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1136
1137    let db = Omnigraph::open(&uri)
1138        .await
1139        .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1140    let db = if let Some(embedding) = cfg.embedding {
1141        db.with_embedding_config(Arc::new(embedding))
1142    } else {
1143        db
1144    };
1145
1146    // Validate this graph's stored queries against the live schema and
1147    // resolve them to an attachable handle (refuse boot on breakage).
1148    // Done before the policy match rebinds `db`; the catalog handle is an
1149    // owned `Arc`, so no borrow of `db` survives into the match.
1150    let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
1151
1152    let (policy_arc, db) = match &cfg.policy {
1153        Some(source) => {
1154            let policy = load_graph_policy(source, graph_id.as_str())?;
1155            let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1156            let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1157            (Some(policy_arc), db.with_policy(checker))
1158        }
1159        None => (None, db),
1160    };
1161
1162    Ok(Arc::new(GraphHandle {
1163        key: GraphKey::cluster(graph_id),
1164        uri,
1165        engine: Arc::new(db),
1166        policy: policy_arc,
1167        queries,
1168    }))
1169}
1170
1171async fn shutdown_signal() {
1172    if let Err(err) = tokio::signal::ctrl_c().await {
1173        error!(error = %err, "failed to install ctrl-c handler");
1174        return;
1175    }
1176    info!("shutdown signal received");
1177}