1pub mod api;
2mod handlers;
3mod settings;
4use handlers::*;
5use settings::*;
6pub use settings::{ServerRuntimeState, classify_server_runtime_state, load_server_settings};
7pub mod auth;
8pub mod graph_id;
9pub mod identity;
10pub mod policy;
11pub mod queries;
12pub mod registry;
13pub mod workload;
14
15pub use graph_id::GraphId;
16pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
17pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
18
19use crate::queries::{QueryRegistry, check, format_check_breakages};
20
21use std::collections::{BTreeMap, HashMap, HashSet};
22use std::fs;
23use std::io;
24use std::io::Write;
25use std::path::PathBuf;
26use std::sync::Arc;
27
28use api::{
29 BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
30 BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
31 CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
32 HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest, InvokeStoredQueryResponse,
33 QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest, SchemaApplyOutput,
34 SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output, schema_apply_output,
35 snapshot_payload,
36};
37pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
38use axum::body::{Body, Bytes};
39use axum::extract::DefaultBodyLimit;
40use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
41use axum::http::StatusCode;
42use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
43use axum::middleware::{self, Next};
44use axum::response::{IntoResponse, Response};
45use axum::routing::{delete, get, post};
46use axum::{Json, Router};
47use color_eyre::eyre::{Result, WrapErr, bail, eyre};
48use futures::stream;
49use omnigraph::db::{Omnigraph, ReadTarget};
50use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
51use omnigraph::storage::normalize_root_uri;
52use omnigraph_compiler::catalog::Catalog;
53use omnigraph_compiler::json_params_to_param_map;
54use omnigraph_compiler::query::parser::parse_query;
55use omnigraph_compiler::{JsonParamMode, ParamMap};
56pub use policy::{
57 PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
58 PolicyRequest, PolicyResourceKind, PolicyTestConfig,
59};
60use serde::Deserialize;
61use serde_json::Value;
62use sha2::{Digest, Sha256};
63use subtle::ConstantTimeEq;
64use tokio::net::TcpListener;
65use tokio::sync::mpsc;
66use tower_http::trace::TraceLayer;
67use tracing::{error, info, warn};
68use tracing_subscriber::EnvFilter;
69use utoipa::OpenApi;
70use utoipa::openapi::path::{Parameter, ParameterIn};
71use utoipa::openapi::schema::{Object, Type};
72use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
73
74type BearerTokenHash = [u8; 32];
75
76fn hash_bearer_token(token: &str) -> BearerTokenHash {
77 let digest = Sha256::digest(token.as_bytes());
78 let mut out = [0u8; 32];
79 out.copy_from_slice(&digest);
80 out
81}
82
83#[derive(OpenApi)]
84#[openapi(
85 info(
86 title = "Omnigraph API",
87 description = "HTTP API for the Omnigraph graph database",
88 ),
89 paths(
90 handlers::server_health,
91 handlers::server_graphs_list,
92 handlers::server_snapshot,
93 #[allow(deprecated)] handlers::server_read,
96 handlers::server_query,
97 handlers::server_export,
98 #[allow(deprecated)] handlers::server_change,
99 handlers::server_mutate,
100 handlers::server_list_queries,
101 handlers::server_invoke_query,
102 handlers::server_schema_apply,
103 handlers::server_schema_get,
104 handlers::server_load,
105 #[allow(deprecated)] handlers::server_ingest,
108 handlers::server_branch_list,
109 handlers::server_branch_create,
110 handlers::server_branch_delete,
111 handlers::server_branch_merge,
112 handlers::server_commit_list,
113 handlers::server_commit_show,
114 ),
115 modifiers(&SecurityAddon),
116)]
117pub struct ApiDoc;
118
119pub fn served_openapi() -> utoipa::openapi::OpenApi {
128 let mut doc = ApiDoc::openapi();
129 handlers::nest_paths_under_cluster_prefix(&mut doc);
130 doc
131}
132
133struct SecurityAddon;
134
135impl utoipa::Modify for SecurityAddon {
136 fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
137 openapi
138 .components
139 .get_or_insert_with(Default::default)
140 .add_security_scheme(
141 "bearer_token",
142 SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
143 );
144 }
145}
146
147const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
148const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
149const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
150const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
151
152#[derive(Debug, Clone)]
153pub struct ServerConfig {
154 pub mode: ServerConfigMode,
159 pub bind: String,
160 pub allow_unauthenticated: bool,
169 pub require_all_graphs: bool,
173}
174
175#[derive(Debug, Clone)]
179pub enum ServerConfigMode {
180 Multi {
184 graphs: Vec<GraphStartupConfig>,
187 config_path: PathBuf,
192 server_policy: Option<PolicySource>,
195 },
196}
197
198#[derive(Debug, Clone)]
203pub enum PolicySource {
204 File(PathBuf),
205 Inline(String),
206}
207
208#[derive(Debug, Clone)]
212pub struct GraphStartupConfig {
213 pub graph_id: String,
214 pub uri: String,
215 pub policy: Option<PolicySource>,
216 pub embedding: Option<omnigraph::embedding::EmbeddingConfig>,
219 pub queries: QueryRegistry,
223}
224
225#[derive(Clone)]
241pub struct GraphRouting {
242 pub registry: Arc<GraphRegistry>,
243 pub config_path: Option<PathBuf>,
244}
245
246#[derive(Clone)]
247pub struct AppState {
248 routing: GraphRouting,
255 workload: Arc<workload::WorkloadController>,
258 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
259 server_policy: Option<Arc<PolicyEngine>>,
264}
265
266struct ExportStreamWriter {
267 sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
268}
269
270impl Write for ExportStreamWriter {
271 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
272 self.sender
273 .send(Ok(Bytes::copy_from_slice(buf)))
274 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
275 Ok(buf.len())
276 }
277
278 fn flush(&mut self) -> io::Result<()> {
279 Ok(())
280 }
281}
282
283#[derive(Debug)]
284pub struct ApiError {
285 status: StatusCode,
286 code: ErrorCode,
287 message: String,
288 merge_conflicts: Vec<api::MergeConflictOutput>,
289 manifest_conflict: Option<api::ManifestConflictOutput>,
290}
291
292impl AppState {
293 pub fn new_single(
302 uri: String,
303 db: Omnigraph,
304 bearer_tokens: Vec<(String, String)>,
305 policy_engine: Option<PolicyEngine>,
306 workload: workload::WorkloadController,
307 ) -> Self {
308 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
309 let per_graph_policy = policy_engine.map(Arc::new);
310 Self::build_single_mode(
311 uri,
312 db,
313 bearer_tokens,
314 per_graph_policy,
315 Arc::new(workload),
316 None,
317 )
318 }
319
320 fn new_single_with_queries(
325 uri: String,
326 db: Omnigraph,
327 bearer_tokens: Vec<(String, String)>,
328 policy_engine: Option<PolicyEngine>,
329 workload: workload::WorkloadController,
330 queries: Option<Arc<QueryRegistry>>,
331 ) -> Self {
332 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
333 let per_graph_policy = policy_engine.map(Arc::new);
334 Self::build_single_mode(
335 uri,
336 db,
337 bearer_tokens,
338 per_graph_policy,
339 Arc::new(workload),
340 queries,
341 )
342 }
343
344 pub fn new(uri: String, db: Omnigraph) -> Self {
345 Self::new_single(
346 uri,
347 db,
348 Vec::new(),
349 None,
350 workload::WorkloadController::from_env(),
351 )
352 }
353
354 pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
355 let bearer_tokens = normalize_bearer_token(bearer_token)
356 .into_iter()
357 .map(|token| ("default".to_string(), token))
358 .collect();
359 Self::new_with_bearer_tokens(uri, db, bearer_tokens)
360 }
361
362 pub fn new_with_bearer_tokens(
363 uri: String,
364 db: Omnigraph,
365 bearer_tokens: Vec<(String, String)>,
366 ) -> Self {
367 Self::new_single(
368 uri,
369 db,
370 bearer_tokens,
371 None,
372 workload::WorkloadController::from_env(),
373 )
374 }
375
376 pub fn new_with_bearer_tokens_and_policy(
377 uri: String,
378 db: Omnigraph,
379 bearer_tokens: Vec<(String, String)>,
380 policy_engine: Option<PolicyEngine>,
381 ) -> Self {
382 Self::new_single(
383 uri,
384 db,
385 bearer_tokens,
386 policy_engine,
387 workload::WorkloadController::from_env(),
388 )
389 }
390
391 pub fn new_with_workload(
397 uri: String,
398 db: Omnigraph,
399 bearer_tokens: Vec<(String, String)>,
400 workload: workload::WorkloadController,
401 ) -> Self {
402 Self::new_single(uri, db, bearer_tokens, None, workload)
403 }
404
405 pub async fn open(uri: impl Into<String>) -> Result<Self> {
406 Self::open_with_bearer_token(uri, None).await
407 }
408
409 pub async fn open_with_bearer_token(
410 uri: impl Into<String>,
411 bearer_token: Option<String>,
412 ) -> Result<Self> {
413 let bearer_tokens = normalize_bearer_token(bearer_token)
414 .into_iter()
415 .map(|token| ("default".to_string(), token))
416 .collect();
417 Self::open_with_bearer_tokens(uri, bearer_tokens).await
418 }
419
420 pub async fn open_with_bearer_tokens(
421 uri: impl Into<String>,
422 bearer_tokens: Vec<(String, String)>,
423 ) -> Result<Self> {
424 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
425 let db = Omnigraph::open(&uri).await?;
426 Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
427 }
428
429 pub async fn open_with_bearer_tokens_and_policy(
430 uri: impl Into<String>,
431 bearer_tokens: Vec<(String, String)>,
432 policy_file: Option<&PathBuf>,
433 ) -> Result<Self> {
434 Self::open_single_with_queries(uri, bearer_tokens, policy_file, QueryRegistry::default())
435 .await
436 }
437
438 pub async fn open_single_with_queries(
446 uri: impl Into<String>,
447 bearer_tokens: Vec<(String, String)>,
448 policy_file: Option<&PathBuf>,
449 queries: QueryRegistry,
450 ) -> Result<Self> {
451 Self::open_single_with_queries_for_graph_id(uri, bearer_tokens, policy_file, queries, None)
452 .await
453 }
454
455 async fn open_single_with_queries_for_graph_id(
456 uri: impl Into<String>,
457 bearer_tokens: Vec<(String, String)>,
458 policy_file: Option<&PathBuf>,
459 queries: QueryRegistry,
460 graph_id: Option<String>,
461 ) -> Result<Self> {
462 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
468 let graph_id = graph_id.unwrap_or_else(|| uri.clone());
469 let db = Omnigraph::open(&uri).await?;
470
471 let registry = validate_and_attach(queries, &db.catalog(), &graph_id)?;
474
475 let policy_engine = match policy_file {
476 Some(path) => Some(PolicyEngine::load_graph(path, &graph_id)?),
477 None => None,
478 };
479 Ok(Self::new_single_with_queries(
480 uri,
481 db,
482 bearer_tokens,
483 policy_engine,
484 workload::WorkloadController::from_env(),
485 registry,
486 ))
487 }
488
489 fn build_single_mode(
497 uri: String,
498 db: Omnigraph,
499 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
500 policy_engine: Option<Arc<PolicyEngine>>,
501 workload: Arc<workload::WorkloadController>,
502 queries: Option<Arc<QueryRegistry>>,
503 ) -> Self {
504 let db = if let Some(policy) = policy_engine.as_ref() {
509 let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
510 db.with_policy(checker)
511 } else {
512 db
513 };
514 let uri = normalize_root_uri(&uri).unwrap_or(uri);
518 let graph_id = GraphId::try_from("default").expect("'default' is a valid GraphId");
519 let key = GraphKey::cluster(graph_id);
520 let handle = Arc::new(GraphHandle {
521 key,
522 uri,
523 engine: Arc::new(db),
524 policy: policy_engine,
525 queries,
526 });
527 let registry = Arc::new(
528 GraphRegistry::from_handles(vec![handle])
529 .expect("a single handle never collides on graph id"),
530 );
531 Self {
532 routing: GraphRouting {
533 registry,
534 config_path: None,
535 },
536 workload,
537 bearer_tokens,
538 server_policy: None,
539 }
540 }
541
542 pub fn new_multi(
549 handles: Vec<Arc<GraphHandle>>,
550 bearer_tokens: Vec<(String, String)>,
551 server_policy: Option<PolicyEngine>,
552 workload: workload::WorkloadController,
553 config_path: Option<PathBuf>,
554 ) -> std::result::Result<Self, InsertError> {
555 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
556 let registry = Arc::new(GraphRegistry::from_handles(handles)?);
557 Ok(Self {
558 routing: GraphRouting {
559 registry,
560 config_path,
561 },
562 workload: Arc::new(workload),
563 bearer_tokens,
564 server_policy: server_policy.map(Arc::new),
565 })
566 }
567
568 pub fn routing(&self) -> &GraphRouting {
572 &self.routing
573 }
574
575 fn requires_bearer_auth(&self) -> bool {
576 if !self.bearer_tokens.is_empty() {
577 return true;
578 }
579 if self.server_policy.is_some() {
580 return true;
581 }
582 self.routing.registry.snapshot_ref().any_per_graph_policy
587 }
588
589 fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
590 let provided_hash = hash_bearer_token(provided_token);
594 let mut matched: Option<Arc<str>> = None;
595 for (hash, actor) in self.bearer_tokens.iter() {
596 if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
597 matched = Some(Arc::clone(actor));
598 }
599 }
600 matched.map(ResolvedActor::cluster_static)
601 }
602}
603
604fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
605 let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
606 .into_iter()
607 .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
608 .collect();
609 Arc::from(tokens)
610}
611
612impl ApiError {
613 pub fn unauthorized(message: impl Into<String>) -> Self {
614 Self {
615 status: StatusCode::UNAUTHORIZED,
616 code: ErrorCode::Unauthorized,
617 message: message.into(),
618 merge_conflicts: Vec::new(),
619 manifest_conflict: None,
620 }
621 }
622
623 pub fn forbidden(message: impl Into<String>) -> Self {
624 Self {
625 status: StatusCode::FORBIDDEN,
626 code: ErrorCode::Forbidden,
627 message: message.into(),
628 merge_conflicts: Vec::new(),
629 manifest_conflict: None,
630 }
631 }
632
633 pub fn bad_request(message: impl Into<String>) -> Self {
634 Self {
635 status: StatusCode::BAD_REQUEST,
636 code: ErrorCode::BadRequest,
637 message: message.into(),
638 merge_conflicts: Vec::new(),
639 manifest_conflict: None,
640 }
641 }
642
643 pub fn not_found(message: impl Into<String>) -> Self {
644 Self {
645 status: StatusCode::NOT_FOUND,
646 code: ErrorCode::NotFound,
647 message: message.into(),
648 merge_conflicts: Vec::new(),
649 manifest_conflict: None,
650 }
651 }
652
653 pub fn method_not_allowed(message: impl Into<String>) -> Self {
658 Self {
659 status: StatusCode::METHOD_NOT_ALLOWED,
660 code: ErrorCode::MethodNotAllowed,
661 message: message.into(),
662 merge_conflicts: Vec::new(),
663 manifest_conflict: None,
664 }
665 }
666
667 pub fn conflict(message: impl Into<String>) -> Self {
668 Self {
669 status: StatusCode::CONFLICT,
670 code: ErrorCode::Conflict,
671 message: message.into(),
672 merge_conflicts: Vec::new(),
673 manifest_conflict: None,
674 }
675 }
676
677 pub fn internal(message: impl Into<String>) -> Self {
678 Self {
679 status: StatusCode::INTERNAL_SERVER_ERROR,
680 code: ErrorCode::Internal,
681 message: message.into(),
682 merge_conflicts: Vec::new(),
683 manifest_conflict: None,
684 }
685 }
686
687 pub fn too_many_requests(message: impl Into<String>) -> Self {
692 Self {
693 status: StatusCode::TOO_MANY_REQUESTS,
694 code: ErrorCode::TooManyRequests,
695 message: message.into(),
696 merge_conflicts: Vec::new(),
697 manifest_conflict: None,
698 }
699 }
700
701 pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
704 match reject {
705 workload::RejectReason::InFlightCountExceeded { .. }
706 | workload::RejectReason::ByteBudgetExceeded { .. } => {
707 Self::too_many_requests(reject.to_string())
708 }
709 }
710 }
711
712 fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
713 Self {
714 status: StatusCode::CONFLICT,
715 code: ErrorCode::Conflict,
716 message: summarize_merge_conflicts(&conflicts),
717 merge_conflicts: conflicts,
718 manifest_conflict: None,
719 }
720 }
721
722 fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
723 Self {
724 status: StatusCode::CONFLICT,
725 code: ErrorCode::Conflict,
726 message,
727 merge_conflicts: Vec::new(),
728 manifest_conflict: Some(details),
729 }
730 }
731
732 fn from_omni(err: OmniError) -> Self {
733 match err {
734 OmniError::Compiler(err) => Self::bad_request(err.to_string()),
735 OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
736 OmniError::Manifest(err) => match err.kind {
737 ManifestErrorKind::BadRequest => Self::bad_request(err.message),
738 ManifestErrorKind::NotFound => Self::not_found(err.message),
739 ManifestErrorKind::Conflict => match err.details {
740 Some(ManifestConflictDetails::ExpectedVersionMismatch {
741 table_key,
742 expected,
743 actual,
744 }) => Self::manifest_version_conflict(
745 err.message,
746 api::ManifestConflictOutput {
747 table_key,
748 expected,
749 actual,
750 },
751 ),
752 _ => Self::conflict(err.message),
753 },
754 ManifestErrorKind::Internal => Self::internal(err.message),
755 },
756 OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
757 conflicts
758 .iter()
759 .map(api::MergeConflictOutput::from)
760 .collect(),
761 ),
762 OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
763 OmniError::Io(err) => Self::internal(format!("io: {err}")),
764 OmniError::Policy(message) => Self::forbidden(message),
771 err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
777 }
778 }
779}
780
781fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
782 if conflicts.is_empty() {
783 return "merge conflicts".to_string();
784 }
785
786 let preview: Vec<String> = conflicts
787 .iter()
788 .take(3)
789 .map(|conflict| match conflict.row_id.as_deref() {
790 Some(row_id) => format!(
791 "{}:{} ({})",
792 conflict.table_key,
793 row_id,
794 conflict.kind.as_str()
795 ),
796 None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
797 })
798 .collect();
799
800 let suffix = if conflicts.len() > preview.len() {
801 format!("; and {} more", conflicts.len() - preview.len())
802 } else {
803 String::new()
804 };
805
806 format!("merge conflicts: {}{}", preview.join("; "), suffix)
807}
808
809const RETRY_AFTER_SECONDS: &str = "60";
811
812impl IntoResponse for ApiError {
813 fn into_response(self) -> Response {
814 let mut headers = axum::http::HeaderMap::new();
815 if matches!(self.code, ErrorCode::TooManyRequests) {
816 headers.insert(
817 axum::http::header::RETRY_AFTER,
818 axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
819 );
820 }
821 (
822 self.status,
823 headers,
824 Json(ErrorOutput {
825 error: self.message,
826 code: Some(self.code),
827 merge_conflicts: self.merge_conflicts,
828 manifest_conflict: self.manifest_conflict,
829 }),
830 )
831 .into_response()
832 }
833}
834
835pub fn init_tracing() {
836 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
837 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
838}
839
840fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
842 for warning in &report.warnings {
843 warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
844 }
845}
846
847fn validate_registry_against_catalog(
848 registry: &QueryRegistry,
849 catalog: &Catalog,
850 label: &str,
851) -> omnigraph::error::Result<()> {
852 let report = check(registry, catalog);
853 if report.has_breakages() {
854 return Err(OmniError::manifest(format_check_breakages(label, &report)));
855 }
856 log_registry_warnings(label, &report);
857 Ok(())
858}
859
860fn validate_and_attach(
868 queries: QueryRegistry,
869 catalog: &Catalog,
870 label: &str,
871) -> Result<Option<Arc<QueryRegistry>>> {
872 validate_registry_against_catalog(&queries, catalog, label)
873 .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
874 Ok(if queries.is_empty() {
875 None
876 } else {
877 Some(Arc::new(queries))
878 })
879}
880
881pub fn build_app(state: AppState) -> Router {
882 let per_graph_protected = Router::new()
890 .route("/snapshot", get(server_snapshot))
891 .route("/export", post(server_export))
892 .route(
898 "/read",
899 post({
900 #[allow(deprecated)]
901 server_read
902 }),
903 )
904 .route("/query", post(server_query))
905 .route(
906 "/change",
907 post({
908 #[allow(deprecated)]
909 server_change
910 }),
911 )
912 .route("/mutate", post(server_mutate))
913 .route("/queries", get(server_list_queries))
914 .route("/queries/{name}", post(server_invoke_query))
915 .route("/schema", get(server_schema_get))
916 .route("/schema/apply", post(server_schema_apply))
917 .route(
918 "/load",
919 post(server_load).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
920 )
921 .route(
925 "/ingest",
926 post({
927 #[allow(deprecated)]
928 server_ingest
929 })
930 .layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
931 )
932 .route(
933 "/branches",
934 get(server_branch_list).post(server_branch_create),
935 )
936 .route("/branches/{branch}", delete(server_branch_delete))
937 .route("/branches/merge", post(server_branch_merge))
938 .route("/commits", get(server_commit_list))
939 .route("/commits/{commit_id}", get(server_commit_show))
940 .route_layer(middleware::from_fn_with_state(
941 state.clone(),
942 resolve_graph_handle,
943 ))
944 .route_layer(middleware::from_fn_with_state(
945 state.clone(),
946 require_bearer_auth,
947 ));
948
949 let management = Router::new()
956 .route("/graphs", get(server_graphs_list))
957 .route_layer(middleware::from_fn_with_state(
958 state.clone(),
959 require_bearer_auth,
960 ));
961
962 let protected: Router<AppState> = Router::new()
965 .nest("/graphs/{graph_id}", per_graph_protected)
966 .merge(management);
967
968 Router::new()
969 .route("/healthz", get(server_health))
970 .route("/openapi.json", get(server_openapi))
971 .merge(protected)
972 .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
973 .layer(TraceLayer::new_for_http())
974 .with_state(state)
975}
976
977pub async fn serve(config: ServerConfig) -> Result<()> {
978 let token_source = resolve_token_source().await?;
979 info!(source = token_source.name(), "loaded bearer token source");
980 let tokens = token_source.load().await?;
981
982 let has_policy_configured = match &config.mode {
987 ServerConfigMode::Multi {
988 graphs,
989 server_policy,
990 ..
991 } => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()),
992 };
993 let runtime_state = classify_server_runtime_state(
994 !tokens.is_empty(),
995 has_policy_configured,
996 config.allow_unauthenticated,
997 )?;
998 match runtime_state {
999 ServerRuntimeState::Open => warn!(
1000 "running with --unauthenticated: no bearer tokens, no policy file, all \
1001 requests permitted. This is for local dev only — do not expose to a \
1002 network you don't fully trust."
1003 ),
1004 ServerRuntimeState::DefaultDeny => warn!(
1005 "bearer tokens are configured but no policy file is set — running in \
1006 default-deny mode (only `read` actions are permitted for authenticated \
1007 actors). Configure a graph or cluster policy bundle in the cluster config, \
1008 run `omnigraph cluster apply`, and restart to enable Cedar rules."
1009 ),
1010 ServerRuntimeState::PolicyEnabled => {}
1011 }
1012
1013 let bind = config.bind.clone();
1014 let state = match config.mode {
1015 ServerConfigMode::Multi {
1016 graphs,
1017 config_path,
1018 server_policy,
1019 } => {
1020 info!(
1021 bind = %bind,
1022 mode = "cluster",
1023 graph_count = graphs.len(),
1024 config = %config_path.display(),
1025 "serving omnigraph"
1026 );
1027 open_multi_graph_state(
1028 graphs,
1029 tokens,
1030 server_policy.as_ref(),
1031 config_path,
1032 config.require_all_graphs,
1033 )
1034 .await?
1035 }
1036 };
1037
1038 let listener = TcpListener::bind(&bind).await?;
1039 axum::serve(listener, build_app(state))
1040 .with_graceful_shutdown(shutdown_signal())
1041 .await?;
1042 Ok(())
1043}
1044
1045fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngine> {
1047 match source {
1048 PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?),
1049 PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?),
1050 }
1051}
1052
1053pub async fn open_multi_graph_state(
1062 graphs: Vec<GraphStartupConfig>,
1063 tokens: Vec<(String, String)>,
1064 server_policy_source: Option<&PolicySource>,
1065 config_path: PathBuf,
1066 require_all_graphs: bool,
1067) -> Result<AppState> {
1068 use futures::StreamExt;
1069
1070 if graphs.is_empty() {
1071 bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1072 }
1073
1074 let server_policy = match server_policy_source {
1079 Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
1080 Some(PolicySource::Inline(source)) => Some(PolicyEngine::load_server_from_source(source)?),
1081 None => None,
1082 };
1083
1084 let configured_graphs = graphs.len();
1085 let results = futures::stream::iter(graphs.into_iter())
1086 .map(|cfg| async move {
1087 let graph_id = cfg.graph_id.clone();
1088 open_single_graph(cfg).await.map_err(|err| (graph_id, err))
1089 })
1090 .buffer_unordered(4)
1091 .collect::<Vec<_>>()
1092 .await;
1093 let mut handles = Vec::new();
1094 let mut failed = 0usize;
1095 for result in results {
1096 match result {
1097 Ok(handle) => handles.push(handle),
1098 Err((graph_id, err)) => {
1099 failed += 1;
1100 warn!(
1101 graph_id = %graph_id,
1102 error = %err,
1103 "graph quarantined during startup"
1104 );
1105 }
1106 }
1107 }
1108 if require_all_graphs && failed > 0 {
1109 bail!(
1110 "strict multi-graph startup requires every graph to open ({} configured, {} failed)",
1111 configured_graphs,
1112 failed
1113 );
1114 }
1115 if handles.is_empty() {
1116 bail!(
1117 "no healthy graphs opened from multi-graph startup config ({} configured, {} failed)",
1118 configured_graphs,
1119 failed
1120 );
1121 }
1122
1123 let workload = workload::WorkloadController::from_env();
1124 let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1125 .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1126 Ok(state)
1127}
1128
1129async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1132 let graph_id = GraphId::try_from(cfg.graph_id.clone())
1133 .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1134 let uri = normalize_root_uri(&cfg.uri)
1135 .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1136
1137 let db = Omnigraph::open(&uri)
1138 .await
1139 .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1140 let db = if let Some(embedding) = cfg.embedding {
1141 db.with_embedding_config(Arc::new(embedding))
1142 } else {
1143 db
1144 };
1145
1146 let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
1151
1152 let (policy_arc, db) = match &cfg.policy {
1153 Some(source) => {
1154 let policy = load_graph_policy(source, graph_id.as_str())?;
1155 let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1156 let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1157 (Some(policy_arc), db.with_policy(checker))
1158 }
1159 None => (None, db),
1160 };
1161
1162 Ok(Arc::new(GraphHandle {
1163 key: GraphKey::cluster(graph_id),
1164 uri,
1165 engine: Arc::new(db),
1166 policy: policy_arc,
1167 queries,
1168 }))
1169}
1170
1171async fn shutdown_signal() {
1172 if let Err(err) = tokio::signal::ctrl_c().await {
1173 error!(error = %err, "failed to install ctrl-c handler");
1174 return;
1175 }
1176 info!("shutdown signal received");
1177}