1pub mod api;
2mod handlers;
3mod settings;
4pub use settings::{load_server_settings, classify_server_runtime_state, ServerRuntimeState};
5use settings::*;
6use handlers::*;
7pub mod auth;
8pub mod graph_id;
9pub mod identity;
10pub mod policy;
11pub mod queries;
12pub mod registry;
13pub mod workload;
14
15pub use graph_id::GraphId;
16pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
17pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
18
19use crate::queries::{QueryRegistry, check, format_check_breakages};
20
21use std::collections::{BTreeMap, HashMap, HashSet};
22use std::fs;
23use std::io;
24use std::io::Write;
25use std::path::PathBuf;
26use std::sync::Arc;
27
28use api::{
29 BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
30 BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
31 CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
32 HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
33 InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
34 SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
35 schema_apply_output, snapshot_payload,
36};
37pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
38use axum::body::{Body, Bytes};
39use axum::extract::DefaultBodyLimit;
40use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
41use axum::http::StatusCode;
42use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
43use axum::middleware::{self, Next};
44use axum::response::{IntoResponse, Response};
45use axum::routing::{delete, get, post};
46use axum::{Json, Router};
47use color_eyre::eyre::{Result, WrapErr, bail, eyre};
48use futures::stream;
49use omnigraph::db::{Omnigraph, ReadTarget};
50use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
51use omnigraph::storage::normalize_root_uri;
52use omnigraph_compiler::catalog::Catalog;
53use omnigraph_compiler::json_params_to_param_map;
54use omnigraph_compiler::query::parser::parse_query;
55use omnigraph_compiler::{JsonParamMode, ParamMap};
56pub use policy::{
57 PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
58 PolicyRequest, PolicyResourceKind, PolicyTestConfig,
59};
60use serde::Deserialize;
61use serde_json::Value;
62use sha2::{Digest, Sha256};
63use subtle::ConstantTimeEq;
64use tokio::net::TcpListener;
65use tokio::sync::mpsc;
66use tower_http::trace::TraceLayer;
67use tracing::{error, info, warn};
68use tracing_subscriber::EnvFilter;
69use utoipa::OpenApi;
70use utoipa::openapi::path::{Parameter, ParameterIn};
71use utoipa::openapi::schema::{Object, Type};
72use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
73
74type BearerTokenHash = [u8; 32];
75
76fn hash_bearer_token(token: &str) -> BearerTokenHash {
77 let digest = Sha256::digest(token.as_bytes());
78 let mut out = [0u8; 32];
79 out.copy_from_slice(&digest);
80 out
81}
82
83#[derive(OpenApi)]
84#[openapi(
85 info(
86 title = "Omnigraph API",
87 description = "HTTP API for the Omnigraph graph database",
88 ),
89 paths(
90 handlers::server_health,
91 handlers::server_graphs_list,
92 handlers::server_snapshot,
93 #[allow(deprecated)] handlers::server_read,
96 handlers::server_query,
97 handlers::server_export,
98 #[allow(deprecated)] handlers::server_change,
99 handlers::server_mutate,
100 handlers::server_list_queries,
101 handlers::server_invoke_query,
102 handlers::server_schema_apply,
103 handlers::server_schema_get,
104 handlers::server_load,
105 #[allow(deprecated)] handlers::server_ingest,
108 handlers::server_branch_list,
109 handlers::server_branch_create,
110 handlers::server_branch_delete,
111 handlers::server_branch_merge,
112 handlers::server_commit_list,
113 handlers::server_commit_show,
114 ),
115 modifiers(&SecurityAddon),
116)]
117pub struct ApiDoc;
118
119pub fn served_openapi() -> utoipa::openapi::OpenApi {
128 let mut doc = ApiDoc::openapi();
129 handlers::nest_paths_under_cluster_prefix(&mut doc);
130 doc
131}
132
133struct SecurityAddon;
134
135impl utoipa::Modify for SecurityAddon {
136 fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
137 openapi
138 .components
139 .get_or_insert_with(Default::default)
140 .add_security_scheme(
141 "bearer_token",
142 SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
143 );
144 }
145}
146
147const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
148const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
149const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
150const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
151
152#[derive(Debug, Clone)]
153pub struct ServerConfig {
154 pub mode: ServerConfigMode,
159 pub bind: String,
160 pub allow_unauthenticated: bool,
169}
170
171#[derive(Debug, Clone)]
175pub enum ServerConfigMode {
176 Multi {
180 graphs: Vec<GraphStartupConfig>,
183 config_path: PathBuf,
188 server_policy: Option<PolicySource>,
191 },
192}
193
194#[derive(Debug, Clone)]
199pub enum PolicySource {
200 File(PathBuf),
201 Inline(String),
202}
203
204#[derive(Debug, Clone)]
208pub struct GraphStartupConfig {
209 pub graph_id: String,
210 pub uri: String,
211 pub policy: Option<PolicySource>,
212 pub embedding: Option<omnigraph::embedding::EmbeddingConfig>,
215 pub queries: QueryRegistry,
219}
220
221#[derive(Clone)]
237pub struct GraphRouting {
238 pub registry: Arc<GraphRegistry>,
239 pub config_path: Option<PathBuf>,
240}
241
242#[derive(Clone)]
243pub struct AppState {
244 routing: GraphRouting,
251 workload: Arc<workload::WorkloadController>,
254 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
255 server_policy: Option<Arc<PolicyEngine>>,
260}
261
262struct ExportStreamWriter {
263 sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
264}
265
266impl Write for ExportStreamWriter {
267 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
268 self.sender
269 .send(Ok(Bytes::copy_from_slice(buf)))
270 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
271 Ok(buf.len())
272 }
273
274 fn flush(&mut self) -> io::Result<()> {
275 Ok(())
276 }
277}
278
279#[derive(Debug)]
280pub struct ApiError {
281 status: StatusCode,
282 code: ErrorCode,
283 message: String,
284 merge_conflicts: Vec<api::MergeConflictOutput>,
285 manifest_conflict: Option<api::ManifestConflictOutput>,
286}
287
288impl AppState {
289 pub fn new_single(
298 uri: String,
299 db: Omnigraph,
300 bearer_tokens: Vec<(String, String)>,
301 policy_engine: Option<PolicyEngine>,
302 workload: workload::WorkloadController,
303 ) -> Self {
304 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
305 let per_graph_policy = policy_engine.map(Arc::new);
306 Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
307 }
308
309 fn new_single_with_queries(
314 uri: String,
315 db: Omnigraph,
316 bearer_tokens: Vec<(String, String)>,
317 policy_engine: Option<PolicyEngine>,
318 workload: workload::WorkloadController,
319 queries: Option<Arc<QueryRegistry>>,
320 ) -> Self {
321 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
322 let per_graph_policy = policy_engine.map(Arc::new);
323 Self::build_single_mode(
324 uri,
325 db,
326 bearer_tokens,
327 per_graph_policy,
328 Arc::new(workload),
329 queries,
330 )
331 }
332
333 pub fn new(uri: String, db: Omnigraph) -> Self {
334 Self::new_single(
335 uri,
336 db,
337 Vec::new(),
338 None,
339 workload::WorkloadController::from_env(),
340 )
341 }
342
343 pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
344 let bearer_tokens = normalize_bearer_token(bearer_token)
345 .into_iter()
346 .map(|token| ("default".to_string(), token))
347 .collect();
348 Self::new_with_bearer_tokens(uri, db, bearer_tokens)
349 }
350
351 pub fn new_with_bearer_tokens(
352 uri: String,
353 db: Omnigraph,
354 bearer_tokens: Vec<(String, String)>,
355 ) -> Self {
356 Self::new_single(
357 uri,
358 db,
359 bearer_tokens,
360 None,
361 workload::WorkloadController::from_env(),
362 )
363 }
364
365 pub fn new_with_bearer_tokens_and_policy(
366 uri: String,
367 db: Omnigraph,
368 bearer_tokens: Vec<(String, String)>,
369 policy_engine: Option<PolicyEngine>,
370 ) -> Self {
371 Self::new_single(
372 uri,
373 db,
374 bearer_tokens,
375 policy_engine,
376 workload::WorkloadController::from_env(),
377 )
378 }
379
380 pub fn new_with_workload(
386 uri: String,
387 db: Omnigraph,
388 bearer_tokens: Vec<(String, String)>,
389 workload: workload::WorkloadController,
390 ) -> Self {
391 Self::new_single(uri, db, bearer_tokens, None, workload)
392 }
393
394 pub async fn open(uri: impl Into<String>) -> Result<Self> {
395 Self::open_with_bearer_token(uri, None).await
396 }
397
398 pub async fn open_with_bearer_token(
399 uri: impl Into<String>,
400 bearer_token: Option<String>,
401 ) -> Result<Self> {
402 let bearer_tokens = normalize_bearer_token(bearer_token)
403 .into_iter()
404 .map(|token| ("default".to_string(), token))
405 .collect();
406 Self::open_with_bearer_tokens(uri, bearer_tokens).await
407 }
408
409 pub async fn open_with_bearer_tokens(
410 uri: impl Into<String>,
411 bearer_tokens: Vec<(String, String)>,
412 ) -> Result<Self> {
413 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
414 let db = Omnigraph::open(&uri).await?;
415 Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
416 }
417
418 pub async fn open_with_bearer_tokens_and_policy(
419 uri: impl Into<String>,
420 bearer_tokens: Vec<(String, String)>,
421 policy_file: Option<&PathBuf>,
422 ) -> Result<Self> {
423 Self::open_single_with_queries(
424 uri,
425 bearer_tokens,
426 policy_file,
427 QueryRegistry::default(),
428 )
429 .await
430 }
431
432 pub async fn open_single_with_queries(
440 uri: impl Into<String>,
441 bearer_tokens: Vec<(String, String)>,
442 policy_file: Option<&PathBuf>,
443 queries: QueryRegistry,
444 ) -> Result<Self> {
445 Self::open_single_with_queries_for_graph_id(uri, bearer_tokens, policy_file, queries, None)
446 .await
447 }
448
449 async fn open_single_with_queries_for_graph_id(
450 uri: impl Into<String>,
451 bearer_tokens: Vec<(String, String)>,
452 policy_file: Option<&PathBuf>,
453 queries: QueryRegistry,
454 graph_id: Option<String>,
455 ) -> Result<Self> {
456 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
462 let graph_id = graph_id.unwrap_or_else(|| uri.clone());
463 let db = Omnigraph::open(&uri).await?;
464
465 let registry = validate_and_attach(queries, &db.catalog(), &graph_id)?;
468
469 let policy_engine = match policy_file {
470 Some(path) => Some(PolicyEngine::load_graph(path, &graph_id)?),
471 None => None,
472 };
473 Ok(Self::new_single_with_queries(
474 uri,
475 db,
476 bearer_tokens,
477 policy_engine,
478 workload::WorkloadController::from_env(),
479 registry,
480 ))
481 }
482
483 fn build_single_mode(
491 uri: String,
492 db: Omnigraph,
493 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
494 policy_engine: Option<Arc<PolicyEngine>>,
495 workload: Arc<workload::WorkloadController>,
496 queries: Option<Arc<QueryRegistry>>,
497 ) -> Self {
498 let db = if let Some(policy) = policy_engine.as_ref() {
503 let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
504 db.with_policy(checker)
505 } else {
506 db
507 };
508 let uri = normalize_root_uri(&uri).unwrap_or(uri);
512 let graph_id =
513 GraphId::try_from("default").expect("'default' is a valid GraphId");
514 let key = GraphKey::cluster(graph_id);
515 let handle = Arc::new(GraphHandle {
516 key,
517 uri,
518 engine: Arc::new(db),
519 policy: policy_engine,
520 queries,
521 });
522 let registry = Arc::new(
523 GraphRegistry::from_handles(vec![handle])
524 .expect("a single handle never collides on graph id"),
525 );
526 Self {
527 routing: GraphRouting {
528 registry,
529 config_path: None,
530 },
531 workload,
532 bearer_tokens,
533 server_policy: None,
534 }
535 }
536
537 pub fn new_multi(
544 handles: Vec<Arc<GraphHandle>>,
545 bearer_tokens: Vec<(String, String)>,
546 server_policy: Option<PolicyEngine>,
547 workload: workload::WorkloadController,
548 config_path: Option<PathBuf>,
549 ) -> std::result::Result<Self, InsertError> {
550 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
551 let registry = Arc::new(GraphRegistry::from_handles(handles)?);
552 Ok(Self {
553 routing: GraphRouting {
554 registry,
555 config_path,
556 },
557 workload: Arc::new(workload),
558 bearer_tokens,
559 server_policy: server_policy.map(Arc::new),
560 })
561 }
562
563 pub fn routing(&self) -> &GraphRouting {
567 &self.routing
568 }
569
570 fn requires_bearer_auth(&self) -> bool {
571 if !self.bearer_tokens.is_empty() {
572 return true;
573 }
574 if self.server_policy.is_some() {
575 return true;
576 }
577 self.routing.registry.snapshot_ref().any_per_graph_policy
582 }
583
584 fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
585 let provided_hash = hash_bearer_token(provided_token);
589 let mut matched: Option<Arc<str>> = None;
590 for (hash, actor) in self.bearer_tokens.iter() {
591 if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
592 matched = Some(Arc::clone(actor));
593 }
594 }
595 matched.map(ResolvedActor::cluster_static)
596 }
597}
598
599fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
600 let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
601 .into_iter()
602 .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
603 .collect();
604 Arc::from(tokens)
605}
606
607impl ApiError {
608 pub fn unauthorized(message: impl Into<String>) -> Self {
609 Self {
610 status: StatusCode::UNAUTHORIZED,
611 code: ErrorCode::Unauthorized,
612 message: message.into(),
613 merge_conflicts: Vec::new(),
614 manifest_conflict: None,
615 }
616 }
617
618 pub fn forbidden(message: impl Into<String>) -> Self {
619 Self {
620 status: StatusCode::FORBIDDEN,
621 code: ErrorCode::Forbidden,
622 message: message.into(),
623 merge_conflicts: Vec::new(),
624 manifest_conflict: None,
625 }
626 }
627
628 pub fn bad_request(message: impl Into<String>) -> Self {
629 Self {
630 status: StatusCode::BAD_REQUEST,
631 code: ErrorCode::BadRequest,
632 message: message.into(),
633 merge_conflicts: Vec::new(),
634 manifest_conflict: None,
635 }
636 }
637
638 pub fn not_found(message: impl Into<String>) -> Self {
639 Self {
640 status: StatusCode::NOT_FOUND,
641 code: ErrorCode::NotFound,
642 message: message.into(),
643 merge_conflicts: Vec::new(),
644 manifest_conflict: None,
645 }
646 }
647
648 pub fn method_not_allowed(message: impl Into<String>) -> Self {
653 Self {
654 status: StatusCode::METHOD_NOT_ALLOWED,
655 code: ErrorCode::MethodNotAllowed,
656 message: message.into(),
657 merge_conflicts: Vec::new(),
658 manifest_conflict: None,
659 }
660 }
661
662 pub fn conflict(message: impl Into<String>) -> Self {
663 Self {
664 status: StatusCode::CONFLICT,
665 code: ErrorCode::Conflict,
666 message: message.into(),
667 merge_conflicts: Vec::new(),
668 manifest_conflict: None,
669 }
670 }
671
672 pub fn internal(message: impl Into<String>) -> Self {
673 Self {
674 status: StatusCode::INTERNAL_SERVER_ERROR,
675 code: ErrorCode::Internal,
676 message: message.into(),
677 merge_conflicts: Vec::new(),
678 manifest_conflict: None,
679 }
680 }
681
682 pub fn too_many_requests(message: impl Into<String>) -> Self {
687 Self {
688 status: StatusCode::TOO_MANY_REQUESTS,
689 code: ErrorCode::TooManyRequests,
690 message: message.into(),
691 merge_conflicts: Vec::new(),
692 manifest_conflict: None,
693 }
694 }
695
696 pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
699 match reject {
700 workload::RejectReason::InFlightCountExceeded { .. }
701 | workload::RejectReason::ByteBudgetExceeded { .. } => {
702 Self::too_many_requests(reject.to_string())
703 }
704 }
705 }
706
707 fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
708 Self {
709 status: StatusCode::CONFLICT,
710 code: ErrorCode::Conflict,
711 message: summarize_merge_conflicts(&conflicts),
712 merge_conflicts: conflicts,
713 manifest_conflict: None,
714 }
715 }
716
717 fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
718 Self {
719 status: StatusCode::CONFLICT,
720 code: ErrorCode::Conflict,
721 message,
722 merge_conflicts: Vec::new(),
723 manifest_conflict: Some(details),
724 }
725 }
726
727 fn from_omni(err: OmniError) -> Self {
728 match err {
729 OmniError::Compiler(err) => Self::bad_request(err.to_string()),
730 OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
731 OmniError::Manifest(err) => match err.kind {
732 ManifestErrorKind::BadRequest => Self::bad_request(err.message),
733 ManifestErrorKind::NotFound => Self::not_found(err.message),
734 ManifestErrorKind::Conflict => match err.details {
735 Some(ManifestConflictDetails::ExpectedVersionMismatch {
736 table_key,
737 expected,
738 actual,
739 }) => Self::manifest_version_conflict(
740 err.message,
741 api::ManifestConflictOutput {
742 table_key,
743 expected,
744 actual,
745 },
746 ),
747 _ => Self::conflict(err.message),
748 },
749 ManifestErrorKind::Internal => Self::internal(err.message),
750 },
751 OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
752 conflicts
753 .iter()
754 .map(api::MergeConflictOutput::from)
755 .collect(),
756 ),
757 OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
758 OmniError::Io(err) => Self::internal(format!("io: {err}")),
759 OmniError::Policy(message) => Self::forbidden(message),
766 err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
772 }
773 }
774}
775
776fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
777 if conflicts.is_empty() {
778 return "merge conflicts".to_string();
779 }
780
781 let preview: Vec<String> = conflicts
782 .iter()
783 .take(3)
784 .map(|conflict| match conflict.row_id.as_deref() {
785 Some(row_id) => format!(
786 "{}:{} ({})",
787 conflict.table_key,
788 row_id,
789 conflict.kind.as_str()
790 ),
791 None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
792 })
793 .collect();
794
795 let suffix = if conflicts.len() > preview.len() {
796 format!("; and {} more", conflicts.len() - preview.len())
797 } else {
798 String::new()
799 };
800
801 format!("merge conflicts: {}{}", preview.join("; "), suffix)
802}
803
804const RETRY_AFTER_SECONDS: &str = "60";
806
807impl IntoResponse for ApiError {
808 fn into_response(self) -> Response {
809 let mut headers = axum::http::HeaderMap::new();
810 if matches!(self.code, ErrorCode::TooManyRequests) {
811 headers.insert(
812 axum::http::header::RETRY_AFTER,
813 axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
814 );
815 }
816 (
817 self.status,
818 headers,
819 Json(ErrorOutput {
820 error: self.message,
821 code: Some(self.code),
822 merge_conflicts: self.merge_conflicts,
823 manifest_conflict: self.manifest_conflict,
824 }),
825 )
826 .into_response()
827 }
828}
829
830pub fn init_tracing() {
831 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
832 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
833}
834
835fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
837 for warning in &report.warnings {
838 warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
839 }
840}
841
842fn validate_registry_against_catalog(
843 registry: &QueryRegistry,
844 catalog: &Catalog,
845 label: &str,
846) -> omnigraph::error::Result<()> {
847 let report = check(registry, catalog);
848 if report.has_breakages() {
849 return Err(OmniError::manifest(format_check_breakages(label, &report)));
850 }
851 log_registry_warnings(label, &report);
852 Ok(())
853}
854
855fn validate_and_attach(
863 queries: QueryRegistry,
864 catalog: &Catalog,
865 label: &str,
866) -> Result<Option<Arc<QueryRegistry>>> {
867 validate_registry_against_catalog(&queries, catalog, label)
868 .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
869 Ok(if queries.is_empty() {
870 None
871 } else {
872 Some(Arc::new(queries))
873 })
874}
875
876pub fn build_app(state: AppState) -> Router {
877 let per_graph_protected = Router::new()
885 .route("/snapshot", get(server_snapshot))
886 .route("/export", post(server_export))
887 .route("/read", post({
893 #[allow(deprecated)]
894 server_read
895 }))
896 .route("/query", post(server_query))
897 .route("/change", post({
898 #[allow(deprecated)]
899 server_change
900 }))
901 .route("/mutate", post(server_mutate))
902 .route("/queries", get(server_list_queries))
903 .route("/queries/{name}", post(server_invoke_query))
904 .route("/schema", get(server_schema_get))
905 .route("/schema/apply", post(server_schema_apply))
906 .route(
907 "/load",
908 post(server_load).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
909 )
910 .route(
914 "/ingest",
915 post({
916 #[allow(deprecated)]
917 server_ingest
918 })
919 .layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
920 )
921 .route(
922 "/branches",
923 get(server_branch_list).post(server_branch_create),
924 )
925 .route("/branches/{branch}", delete(server_branch_delete))
926 .route("/branches/merge", post(server_branch_merge))
927 .route("/commits", get(server_commit_list))
928 .route("/commits/{commit_id}", get(server_commit_show))
929 .route_layer(middleware::from_fn_with_state(
930 state.clone(),
931 resolve_graph_handle,
932 ))
933 .route_layer(middleware::from_fn_with_state(
934 state.clone(),
935 require_bearer_auth,
936 ));
937
938 let management = Router::new()
945 .route("/graphs", get(server_graphs_list))
946 .route_layer(middleware::from_fn_with_state(
947 state.clone(),
948 require_bearer_auth,
949 ));
950
951 let protected: Router<AppState> = Router::new()
954 .nest("/graphs/{graph_id}", per_graph_protected)
955 .merge(management);
956
957 Router::new()
958 .route("/healthz", get(server_health))
959 .route("/openapi.json", get(server_openapi))
960 .merge(protected)
961 .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
962 .layer(TraceLayer::new_for_http())
963 .with_state(state)
964}
965
966pub async fn serve(config: ServerConfig) -> Result<()> {
967 let token_source = resolve_token_source().await?;
968 info!(source = token_source.name(), "loaded bearer token source");
969 let tokens = token_source.load().await?;
970
971 let has_policy_configured = match &config.mode {
976 ServerConfigMode::Multi {
977 graphs,
978 server_policy,
979 ..
980 } => server_policy.is_some() || graphs.iter().any(|g| g.policy.is_some()),
981 };
982 let runtime_state = classify_server_runtime_state(
983 !tokens.is_empty(),
984 has_policy_configured,
985 config.allow_unauthenticated,
986 )?;
987 match runtime_state {
988 ServerRuntimeState::Open => warn!(
989 "running with --unauthenticated: no bearer tokens, no policy file, all \
990 requests permitted. This is for local dev only — do not expose to a \
991 network you don't fully trust."
992 ),
993 ServerRuntimeState::DefaultDeny => warn!(
994 "bearer tokens are configured but no policy file is set — running in \
995 default-deny mode (only `read` actions are permitted for authenticated \
996 actors). Configure a graph or cluster policy bundle in the cluster config, \
997 run `omnigraph cluster apply`, and restart to enable Cedar rules."
998 ),
999 ServerRuntimeState::PolicyEnabled => {}
1000 }
1001
1002 let bind = config.bind.clone();
1003 let state = match config.mode {
1004 ServerConfigMode::Multi {
1005 graphs,
1006 config_path,
1007 server_policy,
1008 } => {
1009 info!(
1010 bind = %bind,
1011 mode = "cluster",
1012 graph_count = graphs.len(),
1013 config = %config_path.display(),
1014 "serving omnigraph"
1015 );
1016 open_multi_graph_state(graphs, tokens, server_policy.as_ref(), config_path).await?
1017 }
1018 };
1019
1020 let listener = TcpListener::bind(&bind).await?;
1021 axum::serve(listener, build_app(state))
1022 .with_graceful_shutdown(shutdown_signal())
1023 .await?;
1024 Ok(())
1025}
1026
1027fn load_graph_policy(source: &PolicySource, graph_id: &str) -> Result<PolicyEngine> {
1029 match source {
1030 PolicySource::File(path) => Ok(PolicyEngine::load_graph(path, graph_id)?),
1031 PolicySource::Inline(text) => Ok(PolicyEngine::load_graph_from_source(text, graph_id)?),
1032 }
1033}
1034
1035pub async fn open_multi_graph_state(
1044 graphs: Vec<GraphStartupConfig>,
1045 tokens: Vec<(String, String)>,
1046 server_policy_source: Option<&PolicySource>,
1047 config_path: PathBuf,
1048) -> Result<AppState> {
1049 use futures::{StreamExt, TryStreamExt};
1050
1051 if graphs.is_empty() {
1052 bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1053 }
1054
1055 let server_policy = match server_policy_source {
1060 Some(PolicySource::File(path)) => Some(PolicyEngine::load_server(path)?),
1061 Some(PolicySource::Inline(source)) => {
1062 Some(PolicyEngine::load_server_from_source(source)?)
1063 }
1064 None => None,
1065 };
1066
1067 let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
1072 .map(|cfg| async move { open_single_graph(cfg).await })
1073 .buffer_unordered(4)
1074 .try_collect()
1075 .await?;
1076
1077 let workload = workload::WorkloadController::from_env();
1078 let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1079 .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1080 Ok(state)
1081}
1082
1083async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1086 let graph_id = GraphId::try_from(cfg.graph_id.clone())
1087 .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1088 let uri = normalize_root_uri(&cfg.uri)
1089 .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1090
1091 let db = Omnigraph::open(&uri)
1092 .await
1093 .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1094 let db = if let Some(embedding) = cfg.embedding {
1095 db.with_embedding_config(Arc::new(embedding))
1096 } else {
1097 db
1098 };
1099
1100 let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
1105
1106 let (policy_arc, db) = match &cfg.policy {
1107 Some(source) => {
1108 let policy = load_graph_policy(source, graph_id.as_str())?;
1109 let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1110 let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1111 (Some(policy_arc), db.with_policy(checker))
1112 }
1113 None => (None, db),
1114 };
1115
1116 Ok(Arc::new(GraphHandle {
1117 key: GraphKey::cluster(graph_id),
1118 uri,
1119 engine: Arc::new(db),
1120 policy: policy_arc,
1121 queries,
1122 }))
1123}
1124
1125async fn shutdown_signal() {
1126 if let Err(err) = tokio::signal::ctrl_c().await {
1127 error!(error = %err, "failed to install ctrl-c handler");
1128 return;
1129 }
1130 info!("shutdown signal received");
1131}