1pub mod api;
2pub mod auth;
3pub mod config;
4pub mod graph_id;
5pub mod identity;
6pub mod policy;
7pub mod queries;
8pub mod registry;
9pub mod workload;
10
11pub use graph_id::GraphId;
12pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
13pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
14
15use crate::queries::{QueryRegistry, check, format_check_breakages};
16
17use std::collections::{HashMap, HashSet};
18use std::fs;
19use std::io;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23
24use api::{
25 BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
26 BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
27 CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
28 HealthOutput, IngestOutput, IngestRequest, InvokeStoredQueryRequest,
29 InvokeStoredQueryResponse, QueriesCatalogOutput, QueryRequest, ReadOutput, ReadRequest,
30 SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
31 schema_apply_output, snapshot_payload,
32};
33pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
34use axum::body::{Body, Bytes};
35use axum::extract::DefaultBodyLimit;
36use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
37use axum::http::StatusCode;
38use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
39use axum::middleware::{self, Next};
40use axum::response::{IntoResponse, Response};
41use axum::routing::{delete, get, post};
42use axum::{Json, Router};
43use color_eyre::eyre::{Result, WrapErr, bail};
44pub use config::{
45 AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
46 ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
47 graph_resource_id_for_selection, load_config,
48};
49use futures::stream;
50use omnigraph::db::{Omnigraph, ReadTarget};
51use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
52use omnigraph::storage::normalize_root_uri;
53use omnigraph_compiler::catalog::Catalog;
54use omnigraph_compiler::json_params_to_param_map;
55use omnigraph_compiler::query::parser::parse_query;
56use omnigraph_compiler::{JsonParamMode, ParamMap};
57pub use policy::{
58 PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
59 PolicyRequest, PolicyResourceKind, PolicyTestConfig,
60};
61use serde::Deserialize;
62use serde_json::Value;
63use sha2::{Digest, Sha256};
64use subtle::ConstantTimeEq;
65use tokio::net::TcpListener;
66use tokio::sync::mpsc;
67use tower_http::trace::TraceLayer;
68use tracing::{error, info, warn};
69use tracing_subscriber::EnvFilter;
70use utoipa::OpenApi;
71use utoipa::openapi::path::{Parameter, ParameterIn};
72use utoipa::openapi::schema::{Object, Type};
73use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
74
75type BearerTokenHash = [u8; 32];
76
77fn hash_bearer_token(token: &str) -> BearerTokenHash {
78 let digest = Sha256::digest(token.as_bytes());
79 let mut out = [0u8; 32];
80 out.copy_from_slice(&digest);
81 out
82}
83
84#[derive(OpenApi)]
85#[openapi(
86 info(
87 title = "Omnigraph API",
88 description = "HTTP API for the Omnigraph graph database",
89 ),
90 paths(
91 server_health,
92 server_graphs_list,
93 server_snapshot,
94 #[allow(deprecated)] server_read,
97 server_query,
98 server_export,
99 #[allow(deprecated)] server_change,
100 server_mutate,
101 server_list_queries,
102 server_invoke_query,
103 server_schema_apply,
104 server_schema_get,
105 server_ingest,
106 server_branch_list,
107 server_branch_create,
108 server_branch_delete,
109 server_branch_merge,
110 server_commit_list,
111 server_commit_show,
112 ),
113 modifiers(&SecurityAddon),
114)]
115pub struct ApiDoc;
116
117struct SecurityAddon;
118
119impl utoipa::Modify for SecurityAddon {
120 fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
121 openapi
122 .components
123 .get_or_insert_with(Default::default)
124 .add_security_scheme(
125 "bearer_token",
126 SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
127 );
128 }
129}
130
131const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
132const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
133const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
134const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
135
136#[derive(Debug, Clone)]
137pub struct ServerConfig {
138 pub mode: ServerConfigMode,
144 pub bind: String,
145 pub allow_unauthenticated: bool,
154}
155
156#[derive(Debug, Clone)]
159pub enum ServerConfigMode {
160 Single {
166 uri: String,
167 graph_id: String,
171 policy_file: Option<PathBuf>,
173 queries: QueryRegistry,
177 },
178 Multi {
181 graphs: Vec<GraphStartupConfig>,
184 config_path: PathBuf,
189 server_policy_file: Option<PathBuf>,
192 },
193}
194
195#[derive(Debug, Clone)]
199pub struct GraphStartupConfig {
200 pub graph_id: String,
201 pub uri: String,
202 pub policy_file: Option<PathBuf>,
203 pub queries: QueryRegistry,
207}
208
209#[derive(Clone)]
224pub enum GraphRouting {
225 Single { handle: Arc<GraphHandle> },
229 Multi {
236 registry: Arc<GraphRegistry>,
237 config_path: Option<PathBuf>,
238 },
239}
240
241#[derive(Clone)]
242pub struct AppState {
243 routing: GraphRouting,
250 workload: Arc<workload::WorkloadController>,
253 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
254 server_policy: Option<Arc<PolicyEngine>>,
261}
262
263struct ExportStreamWriter {
264 sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
265}
266
267impl Write for ExportStreamWriter {
268 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
269 self.sender
270 .send(Ok(Bytes::copy_from_slice(buf)))
271 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
272 Ok(buf.len())
273 }
274
275 fn flush(&mut self) -> io::Result<()> {
276 Ok(())
277 }
278}
279
280#[derive(Debug)]
281pub struct ApiError {
282 status: StatusCode,
283 code: ErrorCode,
284 message: String,
285 merge_conflicts: Vec<api::MergeConflictOutput>,
286 manifest_conflict: Option<api::ManifestConflictOutput>,
287}
288
289impl AppState {
290 pub fn new_single(
299 uri: String,
300 db: Omnigraph,
301 bearer_tokens: Vec<(String, String)>,
302 policy_engine: Option<PolicyEngine>,
303 workload: workload::WorkloadController,
304 ) -> Self {
305 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
306 let per_graph_policy = policy_engine.map(Arc::new);
307 Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload), None)
308 }
309
310 fn new_single_with_queries(
315 uri: String,
316 db: Omnigraph,
317 bearer_tokens: Vec<(String, String)>,
318 policy_engine: Option<PolicyEngine>,
319 workload: workload::WorkloadController,
320 queries: Option<Arc<QueryRegistry>>,
321 ) -> Self {
322 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
323 let per_graph_policy = policy_engine.map(Arc::new);
324 Self::build_single_mode(
325 uri,
326 db,
327 bearer_tokens,
328 per_graph_policy,
329 Arc::new(workload),
330 queries,
331 )
332 }
333
334 pub fn new(uri: String, db: Omnigraph) -> Self {
335 Self::new_single(
336 uri,
337 db,
338 Vec::new(),
339 None,
340 workload::WorkloadController::from_env(),
341 )
342 }
343
344 pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
345 let bearer_tokens = normalize_bearer_token(bearer_token)
346 .into_iter()
347 .map(|token| ("default".to_string(), token))
348 .collect();
349 Self::new_with_bearer_tokens(uri, db, bearer_tokens)
350 }
351
352 pub fn new_with_bearer_tokens(
353 uri: String,
354 db: Omnigraph,
355 bearer_tokens: Vec<(String, String)>,
356 ) -> Self {
357 Self::new_single(
358 uri,
359 db,
360 bearer_tokens,
361 None,
362 workload::WorkloadController::from_env(),
363 )
364 }
365
366 pub fn new_with_bearer_tokens_and_policy(
367 uri: String,
368 db: Omnigraph,
369 bearer_tokens: Vec<(String, String)>,
370 policy_engine: Option<PolicyEngine>,
371 ) -> Self {
372 Self::new_single(
373 uri,
374 db,
375 bearer_tokens,
376 policy_engine,
377 workload::WorkloadController::from_env(),
378 )
379 }
380
381 pub fn new_with_workload(
387 uri: String,
388 db: Omnigraph,
389 bearer_tokens: Vec<(String, String)>,
390 workload: workload::WorkloadController,
391 ) -> Self {
392 Self::new_single(uri, db, bearer_tokens, None, workload)
393 }
394
395 pub async fn open(uri: impl Into<String>) -> Result<Self> {
396 Self::open_with_bearer_token(uri, None).await
397 }
398
399 pub async fn open_with_bearer_token(
400 uri: impl Into<String>,
401 bearer_token: Option<String>,
402 ) -> Result<Self> {
403 let bearer_tokens = normalize_bearer_token(bearer_token)
404 .into_iter()
405 .map(|token| ("default".to_string(), token))
406 .collect();
407 Self::open_with_bearer_tokens(uri, bearer_tokens).await
408 }
409
410 pub async fn open_with_bearer_tokens(
411 uri: impl Into<String>,
412 bearer_tokens: Vec<(String, String)>,
413 ) -> Result<Self> {
414 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
415 let db = Omnigraph::open(&uri).await?;
416 Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
417 }
418
419 pub async fn open_with_bearer_tokens_and_policy(
420 uri: impl Into<String>,
421 bearer_tokens: Vec<(String, String)>,
422 policy_file: Option<&PathBuf>,
423 ) -> Result<Self> {
424 Self::open_single_with_queries(
425 uri,
426 bearer_tokens,
427 policy_file,
428 QueryRegistry::default(),
429 )
430 .await
431 }
432
433 pub async fn open_single_with_queries(
441 uri: impl Into<String>,
442 bearer_tokens: Vec<(String, String)>,
443 policy_file: Option<&PathBuf>,
444 queries: QueryRegistry,
445 ) -> Result<Self> {
446 Self::open_single_with_queries_for_graph_id(uri, bearer_tokens, policy_file, queries, None)
447 .await
448 }
449
450 async fn open_single_with_queries_for_graph_id(
451 uri: impl Into<String>,
452 bearer_tokens: Vec<(String, String)>,
453 policy_file: Option<&PathBuf>,
454 queries: QueryRegistry,
455 graph_id: Option<String>,
456 ) -> Result<Self> {
457 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
463 let graph_id = graph_id.unwrap_or_else(|| uri.clone());
464 let db = Omnigraph::open(&uri).await?;
465
466 let registry = validate_and_attach(queries, &db.catalog(), &graph_id)?;
469
470 let policy_engine = match policy_file {
471 Some(path) => Some(PolicyEngine::load_graph(path, &graph_id)?),
472 None => None,
473 };
474 Ok(Self::new_single_with_queries(
475 uri,
476 db,
477 bearer_tokens,
478 policy_engine,
479 workload::WorkloadController::from_env(),
480 registry,
481 ))
482 }
483
484 fn build_single_mode(
490 uri: String,
491 db: Omnigraph,
492 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
493 policy_engine: Option<Arc<PolicyEngine>>,
494 workload: Arc<workload::WorkloadController>,
495 queries: Option<Arc<QueryRegistry>>,
496 ) -> Self {
497 let db = if let Some(policy) = policy_engine.as_ref() {
502 let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
503 db.with_policy(checker)
504 } else {
505 db
506 };
507 let uri = normalize_root_uri(&uri).unwrap_or(uri);
516 let key = GraphKey::cluster(
517 GraphId::try_from("default").expect("'default' is a valid GraphId log label"),
518 );
519 let handle = Arc::new(GraphHandle {
520 key,
521 uri,
522 engine: Arc::new(db),
523 policy: policy_engine,
524 queries,
525 });
526 Self {
527 routing: GraphRouting::Single { handle },
528 workload,
529 bearer_tokens,
530 server_policy: None,
531 }
532 }
533
534 pub fn new_multi(
542 handles: Vec<Arc<GraphHandle>>,
543 bearer_tokens: Vec<(String, String)>,
544 server_policy: Option<PolicyEngine>,
545 workload: workload::WorkloadController,
546 config_path: Option<PathBuf>,
547 ) -> std::result::Result<Self, InsertError> {
548 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
549 let registry = Arc::new(GraphRegistry::from_handles(handles)?);
550 Ok(Self {
551 routing: GraphRouting::Multi {
552 registry,
553 config_path,
554 },
555 workload: Arc::new(workload),
556 bearer_tokens,
557 server_policy: server_policy.map(Arc::new),
558 })
559 }
560
561 pub fn routing(&self) -> &GraphRouting {
567 &self.routing
568 }
569
570 fn requires_bearer_auth(&self) -> bool {
571 if !self.bearer_tokens.is_empty() {
572 return true;
573 }
574 if self.server_policy.is_some() {
575 return true;
576 }
577 match &self.routing {
583 GraphRouting::Single { handle } => handle.policy.is_some(),
584 GraphRouting::Multi { registry, .. } => registry.snapshot_ref().any_per_graph_policy,
585 }
586 }
587
588 fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
589 let provided_hash = hash_bearer_token(provided_token);
593 let mut matched: Option<Arc<str>> = None;
594 for (hash, actor) in self.bearer_tokens.iter() {
595 if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
596 matched = Some(Arc::clone(actor));
597 }
598 }
599 matched.map(ResolvedActor::cluster_static)
600 }
601}
602
603fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
604 let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
605 .into_iter()
606 .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
607 .collect();
608 Arc::from(tokens)
609}
610
611impl ApiError {
612 pub fn unauthorized(message: impl Into<String>) -> Self {
613 Self {
614 status: StatusCode::UNAUTHORIZED,
615 code: ErrorCode::Unauthorized,
616 message: message.into(),
617 merge_conflicts: Vec::new(),
618 manifest_conflict: None,
619 }
620 }
621
622 pub fn forbidden(message: impl Into<String>) -> Self {
623 Self {
624 status: StatusCode::FORBIDDEN,
625 code: ErrorCode::Forbidden,
626 message: message.into(),
627 merge_conflicts: Vec::new(),
628 manifest_conflict: None,
629 }
630 }
631
632 pub fn bad_request(message: impl Into<String>) -> Self {
633 Self {
634 status: StatusCode::BAD_REQUEST,
635 code: ErrorCode::BadRequest,
636 message: message.into(),
637 merge_conflicts: Vec::new(),
638 manifest_conflict: None,
639 }
640 }
641
642 pub fn not_found(message: impl Into<String>) -> Self {
643 Self {
644 status: StatusCode::NOT_FOUND,
645 code: ErrorCode::NotFound,
646 message: message.into(),
647 merge_conflicts: Vec::new(),
648 manifest_conflict: None,
649 }
650 }
651
652 pub fn method_not_allowed(message: impl Into<String>) -> Self {
657 Self {
658 status: StatusCode::METHOD_NOT_ALLOWED,
659 code: ErrorCode::MethodNotAllowed,
660 message: message.into(),
661 merge_conflicts: Vec::new(),
662 manifest_conflict: None,
663 }
664 }
665
666 pub fn conflict(message: impl Into<String>) -> Self {
667 Self {
668 status: StatusCode::CONFLICT,
669 code: ErrorCode::Conflict,
670 message: message.into(),
671 merge_conflicts: Vec::new(),
672 manifest_conflict: None,
673 }
674 }
675
676 pub fn internal(message: impl Into<String>) -> Self {
677 Self {
678 status: StatusCode::INTERNAL_SERVER_ERROR,
679 code: ErrorCode::Internal,
680 message: message.into(),
681 merge_conflicts: Vec::new(),
682 manifest_conflict: None,
683 }
684 }
685
686 pub fn too_many_requests(message: impl Into<String>) -> Self {
691 Self {
692 status: StatusCode::TOO_MANY_REQUESTS,
693 code: ErrorCode::TooManyRequests,
694 message: message.into(),
695 merge_conflicts: Vec::new(),
696 manifest_conflict: None,
697 }
698 }
699
700 pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
703 match reject {
704 workload::RejectReason::InFlightCountExceeded { .. }
705 | workload::RejectReason::ByteBudgetExceeded { .. } => {
706 Self::too_many_requests(reject.to_string())
707 }
708 }
709 }
710
711 fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
712 Self {
713 status: StatusCode::CONFLICT,
714 code: ErrorCode::Conflict,
715 message: summarize_merge_conflicts(&conflicts),
716 merge_conflicts: conflicts,
717 manifest_conflict: None,
718 }
719 }
720
721 fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
722 Self {
723 status: StatusCode::CONFLICT,
724 code: ErrorCode::Conflict,
725 message,
726 merge_conflicts: Vec::new(),
727 manifest_conflict: Some(details),
728 }
729 }
730
731 fn from_omni(err: OmniError) -> Self {
732 match err {
733 OmniError::Compiler(err) => Self::bad_request(err.to_string()),
734 OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
735 OmniError::Manifest(err) => match err.kind {
736 ManifestErrorKind::BadRequest => Self::bad_request(err.message),
737 ManifestErrorKind::NotFound => Self::not_found(err.message),
738 ManifestErrorKind::Conflict => match err.details {
739 Some(ManifestConflictDetails::ExpectedVersionMismatch {
740 table_key,
741 expected,
742 actual,
743 }) => Self::manifest_version_conflict(
744 err.message,
745 api::ManifestConflictOutput {
746 table_key,
747 expected,
748 actual,
749 },
750 ),
751 _ => Self::conflict(err.message),
752 },
753 ManifestErrorKind::Internal => Self::internal(err.message),
754 },
755 OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
756 conflicts
757 .iter()
758 .map(api::MergeConflictOutput::from)
759 .collect(),
760 ),
761 OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
762 OmniError::Io(err) => Self::internal(format!("io: {err}")),
763 OmniError::Policy(message) => Self::forbidden(message),
770 err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
776 }
777 }
778}
779
780fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
781 if conflicts.is_empty() {
782 return "merge conflicts".to_string();
783 }
784
785 let preview: Vec<String> = conflicts
786 .iter()
787 .take(3)
788 .map(|conflict| match conflict.row_id.as_deref() {
789 Some(row_id) => format!(
790 "{}:{} ({})",
791 conflict.table_key,
792 row_id,
793 conflict.kind.as_str()
794 ),
795 None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
796 })
797 .collect();
798
799 let suffix = if conflicts.len() > preview.len() {
800 format!("; and {} more", conflicts.len() - preview.len())
801 } else {
802 String::new()
803 };
804
805 format!("merge conflicts: {}{}", preview.join("; "), suffix)
806}
807
808const RETRY_AFTER_SECONDS: &str = "60";
810
811impl IntoResponse for ApiError {
812 fn into_response(self) -> Response {
813 let mut headers = axum::http::HeaderMap::new();
814 if matches!(self.code, ErrorCode::TooManyRequests) {
815 headers.insert(
816 axum::http::header::RETRY_AFTER,
817 axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
818 );
819 }
820 (
821 self.status,
822 headers,
823 Json(ErrorOutput {
824 error: self.message,
825 code: Some(self.code),
826 merge_conflicts: self.merge_conflicts,
827 manifest_conflict: self.manifest_conflict,
828 }),
829 )
830 .into_response()
831 }
832}
833
834pub fn init_tracing() {
835 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
836 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
837}
838
839fn log_registry_warnings(label: &str, report: &queries::CheckReport) {
841 for warning in &report.warnings {
842 warn!(graph = label, query = %warning.query, "stored query: {}", warning.message);
843 }
844}
845
846fn validate_registry_against_catalog(
847 registry: &QueryRegistry,
848 catalog: &Catalog,
849 label: &str,
850) -> omnigraph::error::Result<()> {
851 let report = check(registry, catalog);
852 if report.has_breakages() {
853 return Err(OmniError::manifest(format_check_breakages(label, &report)));
854 }
855 log_registry_warnings(label, &report);
856 Ok(())
857}
858
859fn validate_and_attach(
867 queries: QueryRegistry,
868 catalog: &Catalog,
869 label: &str,
870) -> Result<Option<Arc<QueryRegistry>>> {
871 validate_registry_against_catalog(&queries, catalog, label)
872 .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))?;
873 Ok(if queries.is_empty() {
874 None
875 } else {
876 Some(Arc::new(queries))
877 })
878}
879
880fn format_registry_load_errors(label: &str, errors: &[queries::LoadError]) -> String {
883 let joined = errors
884 .iter()
885 .map(|e| e.to_string())
886 .collect::<Vec<_>>()
887 .join("\n ");
888 format!("graph '{label}': stored-query registry failed to load:\n {joined}")
889}
890
891pub fn load_server_settings(
892 config_path: Option<&PathBuf>,
893 cli_uri: Option<String>,
894 cli_target: Option<String>,
895 cli_bind: Option<String>,
896 cli_allow_unauthenticated: bool,
897) -> Result<ServerConfig> {
898 let config = load_config(config_path)?;
899 let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
900 let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
904 .ok()
905 .map(|v| {
906 let trimmed = v.trim();
907 !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
908 })
909 .unwrap_or(false);
910 let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
911
912 let has_cli_uri = cli_uri.is_some();
925 let has_cli_target = cli_target.is_some();
926 let has_server_graph = config.server_graph_name().is_some();
927 let has_graphs_map = !config.graphs.is_empty();
928 let has_explicit_config = config_path.is_some();
929
930 let mode = if has_cli_uri || has_cli_target || has_server_graph {
931 let raw_uri = config.resolve_target_uri(
933 cli_uri,
934 cli_target.as_deref(),
935 config.server_graph_name(),
936 )?;
937 let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
938 format!("normalize single-graph URI '{raw_uri}' from server settings")
939 })?;
940 let selected: Option<&str> = if has_cli_uri {
945 None
946 } else {
947 cli_target.as_deref().or_else(|| config.server_graph_name())
948 };
949 config.ensure_top_level_blocks_honored(selected)?;
954 let policy_file = config.resolve_policy_file_for(selected);
957 let queries = QueryRegistry::load(&config, config.query_entries_for(selected))
958 .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(&uri, &errs)))?;
959 let graph_id = graph_resource_id_for_selection(selected, &uri);
960 ServerConfigMode::Single {
961 uri,
962 graph_id,
963 policy_file,
964 queries,
965 }
966 } else if has_explicit_config && has_graphs_map {
967 let unhonored = config.populated_top_level_blocks();
970 if !unhonored.is_empty() {
971 bail!(
972 "multi-graph mode: top-level {} {} not honored — each graph uses its own \
973 `graphs.<graph_id>.…` block. Move per-graph rules there (and any \
974 `graph_list` policy to `server.policy.file`).",
975 unhonored.join(" and "),
976 if unhonored.len() == 1 { "is" } else { "are" },
977 );
978 }
979 let mut graphs = Vec::with_capacity(config.graphs.len());
981 for (name, target) in &config.graphs {
982 GraphId::try_from(name.clone()).map_err(|err| {
986 color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
987 })?;
988 let raw_uri = config.resolve_uri_value(&target.uri);
989 let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
990 format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
991 })?;
992 let queries = QueryRegistry::load(&config, config.query_entries_for(Some(name.as_str())))
997 .map_err(|errs| color_eyre::eyre::eyre!(format_registry_load_errors(name, &errs)))?;
998 graphs.push(GraphStartupConfig {
999 graph_id: name.clone(),
1000 uri,
1001 policy_file: config.resolve_target_policy_file(name),
1002 queries,
1003 });
1004 }
1005 let config_path = config_path
1006 .cloned()
1007 .expect("has_explicit_config implies config_path is Some");
1008 let server_policy_file = config.resolve_server_policy_file();
1009 ServerConfigMode::Multi {
1010 graphs,
1011 config_path,
1012 server_policy_file,
1013 }
1014 } else {
1015 bail!(
1017 "no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
1018 (`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
1019 omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
1020 file referenced by `--config`."
1021 );
1022 };
1023
1024 Ok(ServerConfig {
1025 mode,
1026 bind,
1027 allow_unauthenticated,
1028 })
1029}
1030
1031pub fn server_config_is_multi(config: &ServerConfig) -> bool {
1034 matches!(config.mode, ServerConfigMode::Multi { .. })
1035}
1036
1037#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1058pub enum ServerRuntimeState {
1059 Open,
1060 DefaultDeny,
1061 PolicyEnabled,
1062}
1063
1064pub fn classify_server_runtime_state(
1075 has_tokens: bool,
1076 has_policy: bool,
1077 allow_unauthenticated: bool,
1078) -> Result<ServerRuntimeState> {
1079 match (has_tokens, has_policy, allow_unauthenticated) {
1080 (false, false, false) => bail!(
1081 "server has no bearer tokens and no policy file configured. This is a fully \
1082 open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
1083 if you actually want that, otherwise configure bearer tokens (see \
1084 docs/user/server.md) and/or `policy.file` in omnigraph.yaml."
1085 ),
1086 (false, false, true) => Ok(ServerRuntimeState::Open),
1087 (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
1088 (false, true, _) => bail!(
1089 "policy file is configured but no bearer tokens — every request would 401 \
1090 because no token can ever match. Configure at least one bearer token (see \
1091 docs/user/server.md), or remove the policy file. To deny all unauthenticated \
1092 traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
1093 produces meaningful 403s with policy-decision logging instead of silent 401s."
1094 ),
1095 (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
1096 }
1097}
1098
1099pub fn build_app(state: AppState) -> Router {
1100 let per_graph_protected = Router::new()
1108 .route("/snapshot", get(server_snapshot))
1109 .route("/export", post(server_export))
1110 .route("/read", post({
1116 #[allow(deprecated)]
1117 server_read
1118 }))
1119 .route("/query", post(server_query))
1120 .route("/change", post({
1121 #[allow(deprecated)]
1122 server_change
1123 }))
1124 .route("/mutate", post(server_mutate))
1125 .route("/queries", get(server_list_queries))
1126 .route("/queries/{name}", post(server_invoke_query))
1127 .route("/schema", get(server_schema_get))
1128 .route("/schema/apply", post(server_schema_apply))
1129 .route(
1130 "/ingest",
1131 post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
1132 )
1133 .route(
1134 "/branches",
1135 get(server_branch_list).post(server_branch_create),
1136 )
1137 .route("/branches/{branch}", delete(server_branch_delete))
1138 .route("/branches/merge", post(server_branch_merge))
1139 .route("/commits", get(server_commit_list))
1140 .route("/commits/{commit_id}", get(server_commit_show))
1141 .route_layer(middleware::from_fn_with_state(
1142 state.clone(),
1143 resolve_graph_handle,
1144 ))
1145 .route_layer(middleware::from_fn_with_state(
1146 state.clone(),
1147 require_bearer_auth,
1148 ));
1149
1150 let management = Router::new()
1161 .route("/graphs", get(server_graphs_list))
1162 .route_layer(middleware::from_fn_with_state(
1163 state.clone(),
1164 require_bearer_auth,
1165 ));
1166
1167 let protected: Router<AppState> = match state.routing() {
1171 GraphRouting::Single { .. } => per_graph_protected.merge(management),
1172 GraphRouting::Multi { .. } => Router::new()
1173 .nest("/graphs/{graph_id}", per_graph_protected)
1174 .merge(management),
1175 };
1176
1177 Router::new()
1178 .route("/healthz", get(server_health))
1179 .route("/openapi.json", get(server_openapi))
1180 .merge(protected)
1181 .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
1182 .layer(TraceLayer::new_for_http())
1183 .with_state(state)
1184}
1185
1186pub async fn serve(config: ServerConfig) -> Result<()> {
1187 let token_source = resolve_token_source().await?;
1188 info!(source = token_source.name(), "loaded bearer token source");
1189 let tokens = token_source.load().await?;
1190
1191 let has_policy_configured = match &config.mode {
1196 ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
1197 ServerConfigMode::Multi {
1198 graphs,
1199 server_policy_file,
1200 ..
1201 } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
1202 };
1203 let runtime_state = classify_server_runtime_state(
1204 !tokens.is_empty(),
1205 has_policy_configured,
1206 config.allow_unauthenticated,
1207 )?;
1208 match runtime_state {
1209 ServerRuntimeState::Open => warn!(
1210 "running with --unauthenticated: no bearer tokens, no policy file, all \
1211 requests permitted. This is for local dev only — do not expose to a \
1212 network you don't fully trust."
1213 ),
1214 ServerRuntimeState::DefaultDeny => warn!(
1215 "bearer tokens are configured but no policy file is set — running in \
1216 default-deny mode (only `read` actions are permitted for authenticated \
1217 actors). Configure `policy.file` in omnigraph.yaml to enable Cedar rules."
1218 ),
1219 ServerRuntimeState::PolicyEnabled => {}
1220 }
1221
1222 let bind = config.bind.clone();
1223 let state = match config.mode {
1224 ServerConfigMode::Single {
1225 uri,
1226 graph_id,
1227 policy_file,
1228 queries,
1229 } => {
1230 let uri_for_log = uri.clone();
1231 info!(
1232 uri = %uri_for_log,
1233 graph_id = %graph_id,
1234 bind = %bind,
1235 mode = "single",
1236 "serving omnigraph"
1237 );
1238 AppState::open_single_with_queries_for_graph_id(
1239 uri,
1240 tokens,
1241 policy_file.as_ref(),
1242 queries,
1243 Some(graph_id),
1244 )
1245 .await?
1246 }
1247 ServerConfigMode::Multi {
1248 graphs,
1249 config_path,
1250 server_policy_file,
1251 } => {
1252 info!(
1253 bind = %bind,
1254 mode = "multi",
1255 graph_count = graphs.len(),
1256 config = %config_path.display(),
1257 "serving omnigraph"
1258 );
1259 open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
1260 }
1261 };
1262
1263 let listener = TcpListener::bind(&bind).await?;
1264 axum::serve(listener, build_app(state))
1265 .with_graceful_shutdown(shutdown_signal())
1266 .await?;
1267 Ok(())
1268}
1269
1270async fn open_multi_graph_state(
1279 graphs: Vec<GraphStartupConfig>,
1280 tokens: Vec<(String, String)>,
1281 server_policy_file: Option<&PathBuf>,
1282 config_path: PathBuf,
1283) -> Result<AppState> {
1284 use futures::{StreamExt, TryStreamExt};
1285
1286 if graphs.is_empty() {
1287 bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1288 }
1289
1290 let server_policy = match server_policy_file {
1295 Some(path) => Some(PolicyEngine::load_server(path)?),
1296 None => None,
1297 };
1298
1299 let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
1304 .map(|cfg| async move { open_single_graph(cfg).await })
1305 .buffer_unordered(4)
1306 .try_collect()
1307 .await?;
1308
1309 let workload = workload::WorkloadController::from_env();
1310 let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1311 .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1312 Ok(state)
1313}
1314
1315async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1318 let graph_id = GraphId::try_from(cfg.graph_id.clone())
1319 .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1320 let uri = normalize_root_uri(&cfg.uri)
1321 .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1322
1323 let db = Omnigraph::open(&uri)
1324 .await
1325 .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1326
1327 let queries = validate_and_attach(cfg.queries, &db.catalog(), graph_id.as_str())?;
1332
1333 let (policy_arc, db) = match &cfg.policy_file {
1334 Some(path) => {
1335 let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
1336 let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1337 let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1338 (Some(policy_arc), db.with_policy(checker))
1339 }
1340 None => (None, db),
1341 };
1342
1343 Ok(Arc::new(GraphHandle {
1344 key: GraphKey::cluster(graph_id),
1345 uri,
1346 engine: Arc::new(db),
1347 policy: policy_arc,
1348 queries,
1349 }))
1350}
1351
1352async fn shutdown_signal() {
1353 if let Err(err) = tokio::signal::ctrl_c().await {
1354 error!(error = %err, "failed to install ctrl-c handler");
1355 return;
1356 }
1357 info!("shutdown signal received");
1358}
1359
1360#[utoipa::path(
1361 get,
1362 path = "/healthz",
1363 tag = "health",
1364 operation_id = "health",
1365 responses(
1366 (status = 200, description = "Server is healthy", body = HealthOutput),
1367 ),
1368)]
1369async fn server_health() -> Json<HealthOutput> {
1375 Json(HealthOutput {
1376 status: "ok".to_string(),
1377 version: SERVER_VERSION.to_string(),
1378 source_version: SERVER_SOURCE_VERSION.map(str::to_string),
1379 })
1380}
1381
1382#[utoipa::path(
1383 get,
1384 path = "/graphs",
1385 tag = "management",
1386 operation_id = "listGraphs",
1387 responses(
1388 (status = 200, description = "List of registered graphs", body = GraphListResponse),
1389 (status = 401, description = "Unauthorized", body = ErrorOutput),
1390 (status = 403, description = "Forbidden", body = ErrorOutput),
1391 (status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput),
1392 ),
1393 security(("bearer_token" = [])),
1394)]
1395async fn server_graphs_list(
1404 State(state): State<AppState>,
1405 actor: Option<Extension<ResolvedActor>>,
1406) -> std::result::Result<Json<GraphListResponse>, ApiError> {
1407 let registry = match state.routing() {
1410 GraphRouting::Single { .. } => {
1411 return Err(ApiError::method_not_allowed(
1412 "GET /graphs is only available in multi-graph mode",
1413 ));
1414 }
1415 GraphRouting::Multi { registry, .. } => registry,
1416 };
1417
1418 authorize_request(
1427 actor.as_ref().map(|Extension(actor)| actor),
1428 state.server_policy.as_deref(),
1429 PolicyRequest {
1430 action: PolicyAction::GraphList,
1431 branch: None,
1432 target_branch: None,
1433 },
1434 )?;
1435
1436 let mut graphs: Vec<GraphInfo> = registry
1437 .list()
1438 .into_iter()
1439 .map(|handle| GraphInfo {
1440 graph_id: handle.key.graph_id.as_str().to_string(),
1441 uri: handle.uri.clone(),
1442 })
1443 .collect();
1444 graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
1445 Ok(Json(GraphListResponse { graphs }))
1446}
1447
1448async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
1449 let mut doc = ApiDoc::openapi();
1450 if !state.requires_bearer_auth() {
1451 strip_security(&mut doc);
1452 }
1453 if matches!(state.routing(), GraphRouting::Multi { .. }) {
1458 nest_paths_under_cluster_prefix(&mut doc);
1459 }
1460 Json(doc)
1461}
1462
1463const CLUSTER_PATH_PREFIX: &str = "/graphs/{graph_id}";
1466
1467const CLUSTER_OPERATION_ID_PREFIX: &str = "cluster_";
1472
1473const ALWAYS_FLAT_PATHS: &[&str] = &["/healthz", "/graphs"];
1479
1480fn nest_paths_under_cluster_prefix(doc: &mut utoipa::openapi::OpenApi) {
1491 let original = std::mem::take(&mut doc.paths.paths);
1492 let mut rewritten = std::collections::BTreeMap::new();
1493 for (path, mut item) in original {
1494 if ALWAYS_FLAT_PATHS.contains(&path.as_str()) {
1495 rewritten.insert(path, item);
1496 continue;
1497 }
1498 rename_operation_ids(&mut item, CLUSTER_OPERATION_ID_PREFIX);
1499 add_cluster_graph_id_parameter(&mut item);
1500 let new_path = format!("{CLUSTER_PATH_PREFIX}{path}");
1501 rewritten.insert(new_path, item);
1502 }
1503 doc.paths.paths = rewritten;
1504}
1505
1506fn add_cluster_graph_id_parameter(item: &mut utoipa::openapi::PathItem) {
1507 for op in path_item_operations_mut(item) {
1508 let parameters = op.parameters.get_or_insert_with(Vec::new);
1509 let has_graph_id = parameters
1510 .iter()
1511 .any(|param| param.name == "graph_id" && param.parameter_in == ParameterIn::Path);
1512 if !has_graph_id {
1513 parameters.insert(0, graph_id_path_parameter());
1514 }
1515 }
1516}
1517
1518fn graph_id_path_parameter() -> Parameter {
1519 let mut parameter = Parameter::new("graph_id");
1520 parameter.parameter_in = ParameterIn::Path;
1521 parameter.description = Some("Graph id to route the request to.".to_string());
1522 parameter.schema = Some(Object::with_type(Type::String).into());
1523 parameter
1524}
1525
1526fn rename_operation_ids(item: &mut utoipa::openapi::PathItem, prefix: &str) {
1528 for op in path_item_operations_mut(item) {
1529 if let Some(id) = op.operation_id.as_deref() {
1530 op.operation_id = Some(format!("{prefix}{id}"));
1531 }
1532 }
1533}
1534
1535fn path_item_operations_mut(
1536 item: &mut utoipa::openapi::PathItem,
1537) -> impl Iterator<Item = &mut utoipa::openapi::path::Operation> {
1538 [
1539 item.get.as_mut(),
1540 item.post.as_mut(),
1541 item.put.as_mut(),
1542 item.delete.as_mut(),
1543 item.options.as_mut(),
1544 item.head.as_mut(),
1545 item.patch.as_mut(),
1546 item.trace.as_mut(),
1547 ]
1548 .into_iter()
1549 .flatten()
1550}
1551
1552fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
1553 if let Some(components) = doc.components.as_mut() {
1554 components.security_schemes.clear();
1555 }
1556 for path_item in doc.paths.paths.values_mut() {
1557 for op in [
1558 path_item.get.as_mut(),
1559 path_item.post.as_mut(),
1560 path_item.put.as_mut(),
1561 path_item.delete.as_mut(),
1562 path_item.options.as_mut(),
1563 path_item.head.as_mut(),
1564 path_item.patch.as_mut(),
1565 path_item.trace.as_mut(),
1566 ]
1567 .into_iter()
1568 .flatten()
1569 {
1570 op.security = None;
1571 }
1572 }
1573}
1574
1575async fn require_bearer_auth(
1576 State(state): State<AppState>,
1577 mut request: Request,
1578 next: Next,
1579) -> std::result::Result<Response, ApiError> {
1580 if !state.requires_bearer_auth() {
1581 return Ok(next.run(request).await);
1582 }
1583
1584 let Some(header) = request
1585 .headers()
1586 .get(AUTHORIZATION)
1587 .and_then(|value| value.to_str().ok())
1588 else {
1589 return Err(ApiError::unauthorized("missing bearer token"));
1590 };
1591
1592 let Some(provided_token) = header.strip_prefix("Bearer ") else {
1593 return Err(ApiError::unauthorized("missing bearer token"));
1594 };
1595
1596 let Some(actor) = state.authenticate_bearer_token(provided_token) else {
1597 return Err(ApiError::unauthorized("invalid bearer token"));
1598 };
1599 request.extensions_mut().insert(actor);
1600
1601 Ok(next.run(request).await)
1602}
1603
1604async fn resolve_graph_handle(
1620 State(state): State<AppState>,
1621 mut request: Request,
1622 next: Next,
1623) -> std::result::Result<Response, ApiError> {
1624 let handle = match &state.routing {
1625 GraphRouting::Single { handle } => Arc::clone(handle),
1626 GraphRouting::Multi { registry, .. } => {
1627 let original_path: String = request
1635 .extensions()
1636 .get::<OriginalUri>()
1637 .map(|OriginalUri(uri)| uri.path().to_string())
1638 .unwrap_or_else(|| request.uri().path().to_string());
1639 let graph_id_str = original_path
1640 .strip_prefix("/graphs/")
1641 .and_then(|rest| rest.split('/').next())
1642 .filter(|s| !s.is_empty())
1643 .ok_or_else(|| {
1644 ApiError::bad_request(
1645 "cluster route missing /graphs/{graph_id} prefix".to_string(),
1646 )
1647 })?;
1648 let graph_id = GraphId::try_from(graph_id_str.to_string())
1649 .map_err(|err| ApiError::bad_request(err.to_string()))?;
1650 let key = GraphKey::cluster(graph_id.clone());
1651 match registry.get(&key) {
1652 RegistryLookup::Ready(handle) => handle,
1653 RegistryLookup::Gone => {
1654 return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
1655 }
1656 }
1657 }
1658 };
1659
1660 info!(graph_id = %handle.key.graph_id, "graph routed");
1665
1666 request.extensions_mut().insert(handle);
1667 Ok(next.run(request).await)
1668}
1669
1670fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
1671 info!(
1672 actor_id = actor_id,
1673 action = %request.action,
1674 branch = request.branch.as_deref().unwrap_or(""),
1675 target_branch = request.target_branch.as_deref().unwrap_or(""),
1676 allowed = decision.allowed,
1677 matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
1678 "policy decision"
1679 );
1680}
1681
1682enum Authz {
1690 Allowed,
1691 Denied(String),
1692}
1693
1694fn authorize(
1711 actor: Option<&ResolvedActor>,
1712 policy: Option<&PolicyEngine>,
1713 request: PolicyRequest,
1714) -> std::result::Result<Authz, ApiError> {
1715 let Some(engine) = policy else {
1716 if request.action.resource_kind() == PolicyResourceKind::Server {
1741 return Ok(Authz::Denied(
1742 "server-scoped actions require an explicit `server.policy.file` \
1743 configured in omnigraph.yaml — the management surface is closed \
1744 by default in every runtime state, including --unauthenticated, \
1745 so that server topology is never exposed without operator opt-in."
1746 .to_string(),
1747 ));
1748 }
1749 if actor.is_some() && request.action != PolicyAction::Read {
1750 return Ok(Authz::Denied(
1751 "server runs in default-deny mode (bearer tokens configured but no \
1752 policy file). Only `read` actions are permitted; configure \
1753 `policy.file` in omnigraph.yaml to enable other actions."
1754 .to_string(),
1755 ));
1756 }
1757 return Ok(Authz::Allowed);
1758 };
1759 let Some(actor) = actor else {
1760 return Err(ApiError::unauthorized("missing bearer token"));
1761 };
1762 let actor_id = actor.actor_id.as_ref();
1774 let decision = engine
1775 .authorize(actor_id, &request)
1776 .map_err(|err| ApiError::internal(format!("policy: {err}")))?;
1777 log_policy_decision(actor_id, &request, &decision);
1778 if decision.allowed {
1779 Ok(Authz::Allowed)
1780 } else {
1781 Ok(Authz::Denied(decision.message))
1782 }
1783}
1784
1785fn authorize_request(
1792 actor: Option<&ResolvedActor>,
1793 policy: Option<&PolicyEngine>,
1794 request: PolicyRequest,
1795) -> std::result::Result<(), ApiError> {
1796 match authorize(actor, policy, request)? {
1797 Authz::Allowed => Ok(()),
1798 Authz::Denied(message) => Err(ApiError::forbidden(message)),
1799 }
1800}
1801
1802#[utoipa::path(
1803 get,
1804 path = "/snapshot",
1805 tag = "snapshots",
1806 operation_id = "getSnapshot",
1807 params(SnapshotQuery),
1808 responses(
1809 (status = 200, description = "Database snapshot", body = api::SnapshotOutput),
1810 (status = 401, description = "Unauthorized", body = ErrorOutput),
1811 (status = 403, description = "Forbidden", body = ErrorOutput),
1812 ),
1813 security(("bearer_token" = [])),
1814)]
1815async fn server_snapshot(
1821 Extension(handle): Extension<Arc<GraphHandle>>,
1822 actor: Option<Extension<ResolvedActor>>,
1823 Query(query): Query<SnapshotQuery>,
1824) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
1825 let branch = query.branch.unwrap_or_else(|| "main".to_string());
1826 authorize_request(
1827 actor.as_ref().map(|Extension(actor)| actor),
1828 handle.policy.as_deref(),
1829 PolicyRequest {
1830 action: PolicyAction::Read,
1831 branch: Some(branch.clone()),
1832 target_branch: None,
1833 },
1834 )?;
1835 let snapshot = {
1836 let db = &handle.engine;
1837 db.snapshot_of(ReadTarget::branch(branch.as_str()))
1838 .await
1839 .map_err(ApiError::from_omni)?
1840 };
1841 Ok(Json(snapshot_payload(&branch, &snapshot)))
1842}
1843
1844fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValue); 2] {
1847 [
1848 (
1849 HeaderName::from_static("deprecation"),
1850 HeaderValue::from_static("true"),
1851 ),
1852 (
1853 HeaderName::from_static("link"),
1854 HeaderValue::from_static(successor_link),
1855 ),
1856 ]
1857}
1858
1859#[utoipa::path(
1860 post,
1861 path = "/read",
1862 tag = "queries",
1863 operation_id = "read",
1864 request_body = ReadRequest,
1865 responses(
1866 (status = 200, description = "Query results (response includes `Deprecation: true` + `Link: </query>; rel=\"successor-version\"`)", body = ReadOutput),
1867 (status = 400, description = "Bad request", body = ErrorOutput),
1868 (status = 401, description = "Unauthorized", body = ErrorOutput),
1869 (status = 403, description = "Forbidden", body = ErrorOutput),
1870 ),
1871 security(("bearer_token" = [])),
1872)]
1873#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
1874async fn server_read(
1884 Extension(handle): Extension<Arc<GraphHandle>>,
1885 actor: Option<Extension<ResolvedActor>>,
1886 Json(request): Json<ReadRequest>,
1887) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ReadOutput>), ApiError> {
1888 let (selected_name, target, result) = run_query(
1889 handle,
1890 actor.as_ref().map(|Extension(actor)| actor),
1891 &request.query_source,
1892 request.query_name.as_deref(),
1893 request.params.as_ref(),
1894 request.branch,
1895 request.snapshot,
1896 false, )
1898 .await?;
1899 Ok((
1900 deprecation_headers("</query>; rel=\"successor-version\""),
1901 Json(api::read_output(selected_name, &target, result)),
1902 ))
1903}
1904
1905#[utoipa::path(
1906 post,
1907 path = "/query",
1908 tag = "queries",
1909 operation_id = "query",
1910 request_body = QueryRequest,
1911 responses(
1912 (status = 200, description = "Query results", body = ReadOutput),
1913 (status = 400, description = "Bad request - also returned when the query body contains mutations; use POST /mutate (or its deprecated alias POST /change) for write queries", body = ErrorOutput),
1914 (status = 401, description = "Unauthorized", body = ErrorOutput),
1915 (status = 403, description = "Forbidden", body = ErrorOutput),
1916 ),
1917 security(("bearer_token" = [])),
1918)]
1919async fn server_query(
1929 Extension(handle): Extension<Arc<GraphHandle>>,
1930 actor: Option<Extension<ResolvedActor>>,
1931 Json(request): Json<QueryRequest>,
1932) -> std::result::Result<Json<ReadOutput>, ApiError> {
1933 let (selected_name, target, result) = run_query(
1934 handle,
1935 actor.as_ref().map(|Extension(actor)| actor),
1936 &request.query,
1937 request.name.as_deref(),
1938 request.params.as_ref(),
1939 request.branch,
1940 request.snapshot,
1941 true, )
1943 .await?;
1944 Ok(Json(api::read_output(selected_name, &target, result)))
1945}
1946
1947#[utoipa::path(
1948 post,
1949 path = "/export",
1950 tag = "queries",
1951 operation_id = "export",
1952 request_body = ExportRequest,
1953 responses(
1954 (status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
1955 (status = 400, description = "Bad request", body = ErrorOutput),
1956 (status = 401, description = "Unauthorized", body = ErrorOutput),
1957 (status = 403, description = "Forbidden", body = ErrorOutput),
1958 ),
1959 security(("bearer_token" = [])),
1960)]
1961async fn server_export(
1968 Extension(handle): Extension<Arc<GraphHandle>>,
1969 actor: Option<Extension<ResolvedActor>>,
1970 Json(request): Json<ExportRequest>,
1971) -> std::result::Result<Response, ApiError> {
1972 let branch = request.branch.unwrap_or_else(|| "main".to_string());
1973 authorize_request(
1974 actor.as_ref().map(|Extension(actor)| actor),
1975 handle.policy.as_deref(),
1976 PolicyRequest {
1977 action: PolicyAction::Export,
1978 branch: Some(branch.clone()),
1979 target_branch: None,
1980 },
1981 )?;
1982 let engine = Arc::clone(&handle.engine);
1983 let type_names = request.type_names.clone();
1984 let table_keys = request.table_keys.clone();
1985 let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
1986 tokio::spawn(async move {
1987 let result = {
1988 let mut writer = ExportStreamWriter { sender: tx.clone() };
1989 engine
1990 .export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
1991 .await
1992 };
1993 if let Err(err) = result {
1994 let _ = tx.send(Err(io::Error::other(err.to_string())));
1995 }
1996 });
1997 let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
1998 rx.recv().await.map(|item| (item, rx))
1999 }));
2000 Ok((
2001 StatusCode::OK,
2002 [(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
2003 body,
2004 )
2005 .into_response())
2006}
2007
2008async fn run_mutate(
2019 state: AppState,
2020 handle: Arc<GraphHandle>,
2021 actor: Option<&ResolvedActor>,
2022 query: &str,
2023 name: Option<&str>,
2024 params_json: Option<&Value>,
2025 branch: String,
2026) -> std::result::Result<ChangeOutput, ApiError> {
2027 let actor_arc = actor
2028 .map(|a| Arc::clone(&a.actor_id))
2029 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2030 let actor_id = actor.map(|a| a.actor_id.as_ref());
2031 authorize_request(
2032 actor,
2033 handle.policy.as_deref(),
2034 PolicyRequest {
2035 action: PolicyAction::Change,
2036 branch: Some(branch.clone()),
2037 target_branch: None,
2038 },
2039 )?;
2040 let est_bytes = query.len() as u64
2045 + params_json
2046 .map(|p| p.to_string().len() as u64)
2047 .unwrap_or(0);
2048 let _admission = state
2049 .workload
2050 .try_admit(&actor_arc, est_bytes)
2051 .map_err(ApiError::from_workload_reject)?;
2052 let (selected_name, query_params) =
2053 select_named_query(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
2054 let params = query_params_from_json(&query_params, params_json)
2055 .map_err(|err| ApiError::bad_request(err.to_string()))?;
2056
2057 let result = {
2058 let db = &handle.engine;
2059 db.mutate_as(&branch, query, &selected_name, ¶ms, actor_id)
2060 .await
2061 .map_err(ApiError::from_omni)?
2062 };
2063 Ok(ChangeOutput {
2064 branch,
2065 query_name: selected_name,
2066 affected_nodes: result.affected_nodes,
2067 affected_edges: result.affected_edges,
2068 actor_id: actor_id.map(str::to_string),
2069 })
2070}
2071
2072async fn run_query(
2085 handle: Arc<GraphHandle>,
2086 actor: Option<&ResolvedActor>,
2087 query: &str,
2088 name: Option<&str>,
2089 params_json: Option<&Value>,
2090 branch: Option<String>,
2091 snapshot: Option<String>,
2092 reject_mutations: bool,
2093) -> std::result::Result<(String, ReadTarget, omnigraph_compiler::result::QueryResult), ApiError> {
2094 if branch.is_some() && snapshot.is_some() {
2095 return Err(ApiError::bad_request(
2096 "request may specify branch or snapshot, not both",
2097 ));
2098 }
2099
2100 let target = read_target_from_request(branch, snapshot);
2101 let policy_branch = match &target {
2102 ReadTarget::Branch(branch) => Some(branch.clone()),
2103 ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
2104 let db = &handle.engine;
2105 db.resolved_branch_of(target.clone())
2106 .await
2107 .map(|branch| branch.or_else(|| Some("main".to_string())))
2108 .map_err(ApiError::from_omni)?
2109 }
2110 ReadTarget::Snapshot(_) => None,
2111 };
2112 authorize_request(
2113 actor,
2114 handle.policy.as_deref(),
2115 PolicyRequest {
2116 action: PolicyAction::Read,
2117 branch: policy_branch,
2118 target_branch: None,
2119 },
2120 )?;
2121 let query_decl =
2122 select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
2123 if reject_mutations && !query_decl.mutations.is_empty() {
2124 return Err(ApiError::bad_request(format!(
2125 "query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
2126 query_decl.name
2127 )));
2128 }
2129 let selected_name = query_decl.name.clone();
2130 let params = query_params_from_json(&query_decl.params, params_json)
2131 .map_err(|err| ApiError::bad_request(err.to_string()))?;
2132
2133 let result = {
2134 let db = &handle.engine;
2135 db.query(target.clone(), query, &selected_name, ¶ms)
2136 .await
2137 .map_err(ApiError::from_omni)?
2138 };
2139 Ok((selected_name, target, result))
2140}
2141
2142#[utoipa::path(
2143 post,
2144 path = "/change",
2145 tag = "mutations",
2146 operation_id = "change",
2147 request_body = ChangeRequest,
2148 responses(
2149 (status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: </mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
2150 (status = 400, description = "Bad request", body = ErrorOutput),
2151 (status = 401, description = "Unauthorized", body = ErrorOutput),
2152 (status = 403, description = "Forbidden", body = ErrorOutput),
2153 (status = 409, description = "Merge conflict", body = ErrorOutput),
2154 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2155 ),
2156 security(("bearer_token" = [])),
2157)]
2158#[deprecated(note = "use POST /mutate instead; /change is kept indefinitely for back-compat")]
2159async fn server_change(
2169 State(state): State<AppState>,
2170 Extension(handle): Extension<Arc<GraphHandle>>,
2171 actor: Option<Extension<ResolvedActor>>,
2172 Json(request): Json<ChangeRequest>,
2173) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ChangeOutput>), ApiError> {
2174 let branch = request.branch.unwrap_or_else(|| "main".to_string());
2175 let output = run_mutate(
2176 state,
2177 handle,
2178 actor.as_ref().map(|Extension(actor)| actor),
2179 &request.query,
2180 request.name.as_deref(),
2181 request.params.as_ref(),
2182 branch,
2183 )
2184 .await?;
2185 Ok((
2186 deprecation_headers("</mutate>; rel=\"successor-version\""),
2187 Json(output),
2188 ))
2189}
2190
2191#[utoipa::path(
2192 post,
2193 path = "/mutate",
2194 tag = "mutations",
2195 operation_id = "mutate",
2196 request_body = ChangeRequest,
2197 responses(
2198 (status = 200, description = "Mutation results", body = ChangeOutput),
2199 (status = 400, description = "Bad request", body = ErrorOutput),
2200 (status = 401, description = "Unauthorized", body = ErrorOutput),
2201 (status = 403, description = "Forbidden", body = ErrorOutput),
2202 (status = 409, description = "Merge conflict", body = ErrorOutput),
2203 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2204 ),
2205 security(("bearer_token" = [])),
2206)]
2207async fn server_mutate(
2217 State(state): State<AppState>,
2218 Extension(handle): Extension<Arc<GraphHandle>>,
2219 actor: Option<Extension<ResolvedActor>>,
2220 Json(request): Json<ChangeRequest>,
2221) -> std::result::Result<Json<ChangeOutput>, ApiError> {
2222 let branch = request.branch.unwrap_or_else(|| "main".to_string());
2223 Ok(Json(
2224 run_mutate(
2225 state,
2226 handle,
2227 actor.as_ref().map(|Extension(actor)| actor),
2228 &request.query,
2229 request.name.as_deref(),
2230 request.params.as_ref(),
2231 branch,
2232 )
2233 .await?,
2234 ))
2235}
2236
2237#[derive(Deserialize)]
2239struct QueryNamePath {
2240 name: String,
2241}
2242
2243fn parse_optional_invoke_body(
2244 body: Bytes,
2245) -> std::result::Result<InvokeStoredQueryRequest, ApiError> {
2246 if body.is_empty() {
2247 return Ok(InvokeStoredQueryRequest::default());
2248 }
2249 serde_json::from_slice::<Option<InvokeStoredQueryRequest>>(&body)
2250 .map(|request| request.unwrap_or_default())
2251 .map_err(|err| {
2252 ApiError::bad_request(format!("invalid stored-query invocation body: {err}"))
2253 })
2254}
2255
2256#[utoipa::path(
2257 post,
2258 path = "/queries/{name}",
2259 tag = "queries",
2260 operation_id = "invoke_query",
2261 params(("name" = String, Path, description = "Stored query name (the registry key)")),
2262 request_body = Option<InvokeStoredQueryRequest>,
2263 responses(
2264 (status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse),
2265 (status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput),
2266 (status = 401, description = "Unauthorized", body = ErrorOutput),
2267 (status = 403, description = "Forbidden (the inner `change` gate for a stored mutation)", body = ErrorOutput),
2268 (status = 404, description = "Unknown stored query, or `invoke_query` denied — indistinguishable to a caller without the grant", body = ErrorOutput),
2269 (status = 409, description = "Merge conflict", body = ErrorOutput),
2270 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2271 (status = 500, description = "Policy evaluation error (a denial is reported as 404, not 500)", body = ErrorOutput),
2272 ),
2273 security(("bearer_token" = [])),
2274)]
2275async fn server_invoke_query(
2287 State(state): State<AppState>,
2288 Extension(handle): Extension<Arc<GraphHandle>>,
2289 actor: Option<Extension<ResolvedActor>>,
2290 Path(QueryNamePath { name }): Path<QueryNamePath>,
2291 body: Bytes,
2292) -> std::result::Result<Json<InvokeStoredQueryResponse>, ApiError> {
2293 let req = parse_optional_invoke_body(body)?;
2294 const NOT_FOUND: &str = "stored query not found";
2299 let actor_ref = actor.as_ref().map(|Extension(actor)| actor);
2300
2301 match authorize(
2307 actor_ref,
2308 handle.policy.as_deref(),
2309 PolicyRequest {
2310 action: PolicyAction::InvokeQuery,
2311 branch: None,
2316 target_branch: None,
2317 },
2318 )? {
2319 Authz::Allowed => {}
2320 Authz::Denied(_) => return Err(ApiError::not_found(NOT_FOUND)),
2321 }
2322
2323 let stored = handle
2325 .queries
2326 .as_ref()
2327 .and_then(|registry| registry.lookup(&name))
2328 .ok_or_else(|| ApiError::not_found(NOT_FOUND))?;
2329
2330 let source = Arc::clone(&stored.source);
2333 let query_name = stored.name.clone();
2334 let is_mutation = stored.is_mutation();
2335
2336 info!(
2337 graph = %handle.uri,
2338 actor = ?actor_ref.map(|a| a.actor_id.as_ref()),
2339 query = %query_name,
2340 kind = if is_mutation { "mutate" } else { "read" },
2341 "stored query invoked"
2342 );
2343
2344 if is_mutation {
2345 if req.snapshot.is_some() {
2346 return Err(ApiError::bad_request(
2347 "stored mutation cannot target a snapshot",
2348 ));
2349 }
2350 let branch = req.branch.unwrap_or_else(|| "main".to_string());
2351 let output = run_mutate(
2352 state,
2353 handle,
2354 actor_ref,
2355 &source,
2356 Some(&query_name),
2357 req.params.as_ref(),
2358 branch,
2359 )
2360 .await?;
2361 Ok(Json(InvokeStoredQueryResponse::Change(output)))
2362 } else {
2363 let (selected, target, result) = run_query(
2364 handle,
2365 actor_ref,
2366 &source,
2367 Some(&query_name),
2368 req.params.as_ref(),
2369 req.branch,
2370 req.snapshot,
2371 true,
2372 )
2373 .await?;
2374 Ok(Json(InvokeStoredQueryResponse::Read(api::read_output(
2375 selected, &target, result,
2376 ))))
2377 }
2378}
2379
2380#[utoipa::path(
2381 get,
2382 path = "/queries",
2383 tag = "queries",
2384 operation_id = "list_queries",
2385 responses(
2386 (status = 200, description = "Stored-query catalog (the mcp.expose subset, with typed params)", body = QueriesCatalogOutput),
2387 (status = 401, description = "Unauthorized", body = ErrorOutput),
2388 (status = 403, description = "Forbidden", body = ErrorOutput),
2389 ),
2390 security(("bearer_token" = [])),
2391)]
2392async fn server_list_queries(
2402 Extension(handle): Extension<Arc<GraphHandle>>,
2403 actor: Option<Extension<ResolvedActor>>,
2404) -> std::result::Result<Json<QueriesCatalogOutput>, ApiError> {
2405 authorize_request(
2406 actor.as_ref().map(|Extension(actor)| actor),
2407 handle.policy.as_deref(),
2408 PolicyRequest {
2409 action: PolicyAction::Read,
2410 branch: Some("main".to_string()),
2411 target_branch: None,
2412 },
2413 )?;
2414 let queries = match handle.queries.as_ref() {
2415 Some(registry) => registry
2416 .iter()
2417 .filter(|q| q.expose)
2418 .map(api::query_catalog_entry)
2419 .collect(),
2420 None => Vec::new(),
2421 };
2422 Ok(Json(QueriesCatalogOutput { queries }))
2423}
2424
2425#[utoipa::path(
2426 get,
2427 path = "/schema",
2428 tag = "schema",
2429 operation_id = "getSchema",
2430 responses(
2431 (status = 200, description = "Current schema source", body = SchemaOutput),
2432 (status = 401, description = "Unauthorized", body = ErrorOutput),
2433 (status = 403, description = "Forbidden", body = ErrorOutput),
2434 ),
2435 security(("bearer_token" = [])),
2436)]
2437async fn server_schema_get(
2443 Extension(handle): Extension<Arc<GraphHandle>>,
2444 actor: Option<Extension<ResolvedActor>>,
2445) -> std::result::Result<Json<SchemaOutput>, ApiError> {
2446 authorize_request(
2447 actor.as_ref().map(|Extension(actor)| actor),
2448 handle.policy.as_deref(),
2449 PolicyRequest {
2450 action: PolicyAction::Read,
2451 branch: None,
2452 target_branch: None,
2453 },
2454 )?;
2455 let schema_source = {
2456 let db = &handle.engine;
2457 db.schema_source().to_string()
2458 };
2459 Ok(Json(SchemaOutput { schema_source }))
2460}
2461
2462#[utoipa::path(
2463 post,
2464 path = "/schema/apply",
2465 tag = "mutations",
2466 operation_id = "applySchema",
2467 request_body = SchemaApplyRequest,
2468 responses(
2469 (status = 200, description = "Schema apply results", body = SchemaApplyOutput),
2470 (status = 400, description = "Bad request", body = ErrorOutput),
2471 (status = 401, description = "Unauthorized", body = ErrorOutput),
2472 (status = 403, description = "Forbidden", body = ErrorOutput),
2473 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2474 ),
2475 security(("bearer_token" = [])),
2476)]
2477async fn server_schema_apply(
2484 State(state): State<AppState>,
2485 Extension(handle): Extension<Arc<GraphHandle>>,
2486 actor: Option<Extension<ResolvedActor>>,
2487 Json(request): Json<SchemaApplyRequest>,
2488) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
2489 let actor_arc = actor
2490 .as_ref()
2491 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2492 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2493 let actor_id = actor
2494 .as_ref()
2495 .map(|Extension(actor)| actor.actor_id.as_ref());
2496 authorize_request(
2497 actor.as_ref().map(|Extension(actor)| actor),
2498 handle.policy.as_deref(),
2499 PolicyRequest {
2500 action: PolicyAction::SchemaApply,
2501 branch: None,
2502 target_branch: Some("main".to_string()),
2503 },
2504 )?;
2505 let est_bytes = request.schema_source.len() as u64;
2506 let _admission = state
2507 .workload
2508 .try_admit(&actor_arc, est_bytes)
2509 .map_err(ApiError::from_workload_reject)?;
2510 let result = {
2511 let db = &handle.engine;
2512 let registry = handle.queries.as_deref();
2513 let label = handle.key.graph_id.as_str().to_string();
2514 db.apply_schema_as_with_catalog_check(
2521 &request.schema_source,
2522 omnigraph::db::SchemaApplyOptions {
2523 allow_data_loss: request.allow_data_loss,
2524 },
2525 actor_id,
2526 |catalog| {
2527 if let Some(registry) = registry {
2528 validate_registry_against_catalog(registry, catalog, &label)?;
2529 }
2530 Ok(())
2531 },
2532 )
2533 .await
2534 .map_err(ApiError::from_omni)?
2535 };
2536 Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
2537}
2538
2539#[utoipa::path(
2540 post,
2541 path = "/ingest",
2542 tag = "mutations",
2543 operation_id = "ingest",
2544 request_body = IngestRequest,
2545 responses(
2546 (status = 200, description = "Ingest results", body = IngestOutput),
2547 (status = 400, description = "Bad request", body = ErrorOutput),
2548 (status = 401, description = "Unauthorized", body = ErrorOutput),
2549 (status = 403, description = "Forbidden", body = ErrorOutput),
2550 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2551 ),
2552 security(("bearer_token" = [])),
2553)]
2554async fn server_ingest(
2562 State(state): State<AppState>,
2563 Extension(handle): Extension<Arc<GraphHandle>>,
2564 actor: Option<Extension<ResolvedActor>>,
2565 Json(request): Json<IngestRequest>,
2566) -> std::result::Result<Json<IngestOutput>, ApiError> {
2567 let branch = request.branch.unwrap_or_else(|| "main".to_string());
2568 let from = request.from.unwrap_or_else(|| "main".to_string());
2569 let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
2570 let actor_arc = actor
2571 .as_ref()
2572 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2573 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2574 let actor_id = actor
2575 .as_ref()
2576 .map(|Extension(actor)| actor.actor_id.as_ref());
2577
2578 let branch_exists = {
2579 let db = &handle.engine;
2580 db.branch_list()
2581 .await
2582 .map_err(ApiError::from_omni)?
2583 .into_iter()
2584 .any(|name| name == branch)
2585 };
2586
2587 if !branch_exists {
2588 authorize_request(
2589 actor.as_ref().map(|Extension(actor)| actor),
2590 handle.policy.as_deref(),
2591 PolicyRequest {
2592 action: PolicyAction::BranchCreate,
2593 branch: Some(from.clone()),
2594 target_branch: Some(branch.clone()),
2595 },
2596 )?;
2597 }
2598 authorize_request(
2599 actor.as_ref().map(|Extension(actor)| actor),
2600 handle.policy.as_deref(),
2601 PolicyRequest {
2602 action: PolicyAction::Change,
2603 branch: Some(branch.clone()),
2604 target_branch: None,
2605 },
2606 )?;
2607 let est_bytes = request.data.len() as u64;
2608 let _admission = state
2609 .workload
2610 .try_admit(&actor_arc, est_bytes)
2611 .map_err(ApiError::from_workload_reject)?;
2612
2613 let result = {
2614 let db = &handle.engine;
2615 db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
2616 .await
2617 .map_err(ApiError::from_omni)?
2618 };
2619
2620 Ok(Json(ingest_output(
2621 handle.uri.as_str(),
2622 &result,
2623 actor_id.map(str::to_string),
2624 )))
2625}
2626
2627#[utoipa::path(
2628 get,
2629 path = "/branches",
2630 tag = "branches",
2631 operation_id = "listBranches",
2632 responses(
2633 (status = 200, description = "List of branches", body = BranchListOutput),
2634 (status = 401, description = "Unauthorized", body = ErrorOutput),
2635 (status = 403, description = "Forbidden", body = ErrorOutput),
2636 ),
2637 security(("bearer_token" = [])),
2638)]
2639async fn server_branch_list(
2643 Extension(handle): Extension<Arc<GraphHandle>>,
2644 actor: Option<Extension<ResolvedActor>>,
2645) -> std::result::Result<Json<BranchListOutput>, ApiError> {
2646 authorize_request(
2647 actor.as_ref().map(|Extension(actor)| actor),
2648 handle.policy.as_deref(),
2649 PolicyRequest {
2650 action: PolicyAction::Read,
2651 branch: None,
2652 target_branch: None,
2653 },
2654 )?;
2655 let mut branches = {
2656 let db = &handle.engine;
2657 db.branch_list().await.map_err(ApiError::from_omni)?
2658 };
2659 branches.sort();
2660 Ok(Json(BranchListOutput { branches }))
2661}
2662
2663#[utoipa::path(
2664 post,
2665 path = "/branches",
2666 tag = "branches",
2667 operation_id = "createBranch",
2668 request_body = BranchCreateRequest,
2669 responses(
2670 (status = 200, description = "Branch created", body = BranchCreateOutput),
2671 (status = 400, description = "Bad request", body = ErrorOutput),
2672 (status = 401, description = "Unauthorized", body = ErrorOutput),
2673 (status = 403, description = "Forbidden", body = ErrorOutput),
2674 (status = 409, description = "Branch already exists", body = ErrorOutput),
2675 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2676 ),
2677 security(("bearer_token" = [])),
2678)]
2679async fn server_branch_create(
2685 State(state): State<AppState>,
2686 Extension(handle): Extension<Arc<GraphHandle>>,
2687 actor: Option<Extension<ResolvedActor>>,
2688 Json(request): Json<BranchCreateRequest>,
2689) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
2690 let from = request.from.unwrap_or_else(|| "main".to_string());
2691 let actor_arc = actor
2692 .as_ref()
2693 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2694 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2695 authorize_request(
2696 actor.as_ref().map(|Extension(actor)| actor),
2697 handle.policy.as_deref(),
2698 PolicyRequest {
2699 action: PolicyAction::BranchCreate,
2700 branch: Some(from.clone()),
2701 target_branch: Some(request.name.clone()),
2702 },
2703 )?;
2704 let _admission = state
2708 .workload
2709 .try_admit(&actor_arc, 256)
2710 .map_err(ApiError::from_workload_reject)?;
2711 {
2712 let db = &handle.engine;
2713 db.branch_create_from_as(
2714 ReadTarget::branch(&from),
2715 &request.name,
2716 actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
2717 )
2718 .await
2719 .map_err(ApiError::from_omni)?;
2720 }
2721 Ok(Json(BranchCreateOutput {
2722 uri: handle.uri.clone(),
2723 from,
2724 name: request.name,
2725 actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
2726 }))
2727}
2728
2729#[derive(Deserialize)]
2739struct BranchPath {
2740 branch: String,
2741}
2742
2743#[utoipa::path(
2744 delete,
2745 path = "/branches/{branch}",
2746 tag = "branches",
2747 operation_id = "deleteBranch",
2748 params(
2749 ("branch" = String, Path, description = "Branch name to delete"),
2750 ),
2751 responses(
2752 (status = 200, description = "Branch deleted", body = BranchDeleteOutput),
2753 (status = 401, description = "Unauthorized", body = ErrorOutput),
2754 (status = 403, description = "Forbidden", body = ErrorOutput),
2755 (status = 404, description = "Branch not found", body = ErrorOutput),
2756 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2757 ),
2758 security(("bearer_token" = [])),
2759)]
2760async fn server_branch_delete(
2766 State(state): State<AppState>,
2767 Extension(handle): Extension<Arc<GraphHandle>>,
2768 actor: Option<Extension<ResolvedActor>>,
2769 Path(BranchPath { branch }): Path<BranchPath>,
2770) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
2771 let actor_arc = actor
2772 .as_ref()
2773 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2774 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2775 let actor_id = actor
2776 .as_ref()
2777 .map(|Extension(actor)| actor.actor_id.as_ref());
2778 authorize_request(
2779 actor.as_ref().map(|Extension(actor)| actor),
2780 handle.policy.as_deref(),
2781 PolicyRequest {
2782 action: PolicyAction::BranchDelete,
2783 branch: None,
2784 target_branch: Some(branch.clone()),
2785 },
2786 )?;
2787 let _admission = state
2789 .workload
2790 .try_admit(&actor_arc, 256)
2791 .map_err(ApiError::from_workload_reject)?;
2792 {
2793 let db = &handle.engine;
2794 db.branch_delete_as(&branch, actor_id)
2795 .await
2796 .map_err(ApiError::from_omni)?;
2797 }
2798 Ok(Json(BranchDeleteOutput {
2799 uri: handle.uri.clone(),
2800 name: branch,
2801 actor_id: actor_id.map(str::to_string),
2802 }))
2803}
2804
2805#[utoipa::path(
2806 post,
2807 path = "/branches/merge",
2808 tag = "branches",
2809 operation_id = "mergeBranches",
2810 request_body = BranchMergeRequest,
2811 responses(
2812 (status = 200, description = "Branches merged", body = BranchMergeOutput),
2813 (status = 400, description = "Bad request", body = ErrorOutput),
2814 (status = 401, description = "Unauthorized", body = ErrorOutput),
2815 (status = 403, description = "Forbidden", body = ErrorOutput),
2816 (status = 409, description = "Merge conflict", body = ErrorOutput),
2817 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2818 ),
2819 security(("bearer_token" = [])),
2820)]
2821async fn server_branch_merge(
2828 State(state): State<AppState>,
2829 Extension(handle): Extension<Arc<GraphHandle>>,
2830 actor: Option<Extension<ResolvedActor>>,
2831 Json(request): Json<BranchMergeRequest>,
2832) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
2833 let target = request.target.unwrap_or_else(|| "main".to_string());
2834 let actor_arc = actor
2835 .as_ref()
2836 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2837 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2838 let actor_id = actor
2839 .as_ref()
2840 .map(|Extension(actor)| actor.actor_id.as_ref());
2841 authorize_request(
2842 actor.as_ref().map(|Extension(actor)| actor),
2843 handle.policy.as_deref(),
2844 PolicyRequest {
2845 action: PolicyAction::BranchMerge,
2846 branch: Some(request.source.clone()),
2847 target_branch: Some(target.clone()),
2848 },
2849 )?;
2850 let _admission = state
2854 .workload
2855 .try_admit(&actor_arc, 256)
2856 .map_err(ApiError::from_workload_reject)?;
2857 let outcome = {
2858 let db = &handle.engine;
2859 db.branch_merge_as(&request.source, &target, actor_id)
2860 .await
2861 .map_err(ApiError::from_omni)?
2862 };
2863 Ok(Json(BranchMergeOutput {
2864 source: request.source,
2865 target,
2866 outcome: outcome.into(),
2867 actor_id: actor_id.map(str::to_string),
2868 }))
2869}
2870
2871#[utoipa::path(
2872 get,
2873 path = "/commits",
2874 tag = "commits",
2875 operation_id = "listCommits",
2876 params(CommitListQuery),
2877 responses(
2878 (status = 200, description = "List of commits", body = CommitListOutput),
2879 (status = 401, description = "Unauthorized", body = ErrorOutput),
2880 (status = 403, description = "Forbidden", body = ErrorOutput),
2881 ),
2882 security(("bearer_token" = [])),
2883)]
2884async fn server_commit_list(
2889 Extension(handle): Extension<Arc<GraphHandle>>,
2890 actor: Option<Extension<ResolvedActor>>,
2891 Query(query): Query<CommitListQuery>,
2892) -> std::result::Result<Json<CommitListOutput>, ApiError> {
2893 authorize_request(
2894 actor.as_ref().map(|Extension(actor)| actor),
2895 handle.policy.as_deref(),
2896 PolicyRequest {
2897 action: PolicyAction::Read,
2898 branch: query.branch.clone(),
2899 target_branch: None,
2900 },
2901 )?;
2902 let commits = {
2903 let db = &handle.engine;
2904 db.list_commits(query.branch.as_deref())
2905 .await
2906 .map_err(ApiError::from_omni)?
2907 };
2908 Ok(Json(CommitListOutput {
2909 commits: commits.iter().map(api::commit_output).collect(),
2910 }))
2911}
2912
2913#[derive(Deserialize)]
2916struct CommitPath {
2917 commit_id: String,
2918}
2919
2920#[utoipa::path(
2921 get,
2922 path = "/commits/{commit_id}",
2923 tag = "commits",
2924 operation_id = "getCommit",
2925 params(
2926 ("commit_id" = String, Path, description = "Commit identifier"),
2927 ),
2928 responses(
2929 (status = 200, description = "Commit details", body = api::CommitOutput),
2930 (status = 401, description = "Unauthorized", body = ErrorOutput),
2931 (status = 403, description = "Forbidden", body = ErrorOutput),
2932 (status = 404, description = "Commit not found", body = ErrorOutput),
2933 ),
2934 security(("bearer_token" = [])),
2935)]
2936
2937async fn server_commit_show(
2942 Extension(handle): Extension<Arc<GraphHandle>>,
2943 actor: Option<Extension<ResolvedActor>>,
2944 Path(CommitPath { commit_id }): Path<CommitPath>,
2945) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
2946 authorize_request(
2947 actor.as_ref().map(|Extension(actor)| actor),
2948 handle.policy.as_deref(),
2949 PolicyRequest {
2950 action: PolicyAction::Read,
2951 branch: None,
2952 target_branch: None,
2953 },
2954 )?;
2955 let commit = {
2956 let db = &handle.engine;
2957 db.get_commit(&commit_id)
2958 .await
2959 .map_err(ApiError::from_omni)?
2960 };
2961 Ok(Json(api::commit_output(&commit)))
2962}
2963
2964fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
2965 if let Some(snapshot) = snapshot {
2966 ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
2967 } else {
2968 ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
2969 }
2970}
2971
2972fn select_named_query_decl(
2973 query_source: &str,
2974 requested_name: Option<&str>,
2975) -> Result<omnigraph_compiler::query::ast::QueryDecl> {
2976 let parsed = parse_query(query_source)?;
2977 let query = if let Some(name) = requested_name {
2978 parsed
2979 .queries
2980 .into_iter()
2981 .find(|query| query.name == name)
2982 .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
2983 } else if parsed.queries.len() == 1 {
2984 parsed.queries.into_iter().next().unwrap()
2985 } else {
2986 bail!("query file contains multiple queries; pass --name");
2987 };
2988 Ok(query)
2989}
2990
2991fn select_named_query(
2992 query_source: &str,
2993 requested_name: Option<&str>,
2994) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
2995 let query = select_named_query_decl(query_source, requested_name)?;
2996 Ok((query.name, query.params))
2997}
2998
2999fn query_params_from_json(
3000 query_params: &[omnigraph_compiler::query::ast::Param],
3001 params_json: Option<&Value>,
3002) -> Result<ParamMap> {
3003 json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
3004 .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
3005}
3006
3007fn normalize_bearer_token(value: Option<String>) -> Option<String> {
3008 value
3009 .map(|value| value.trim().to_string())
3010 .filter(|value| !value.is_empty())
3011}
3012
3013fn normalize_bearer_actor(value: String) -> Result<String> {
3014 let value = value.trim().to_string();
3015 if value.is_empty() {
3016 bail!("bearer token actor names must not be blank");
3017 }
3018 Ok(value)
3019}
3020
3021fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
3022 let entries: HashMap<String, String> = serde_json::from_str(value)
3023 .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
3024 Ok(entries.into_iter().collect())
3025}
3026
3027fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
3028 let contents = fs::read_to_string(path)
3029 .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
3030 parse_bearer_tokens_json(&contents)
3031 .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
3032}
3033
3034fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
3035 let mut seen_actors = HashSet::new();
3036 let mut seen_tokens = HashSet::new();
3037 let mut normalized = Vec::with_capacity(entries.len());
3038
3039 for (actor, token) in entries {
3040 let actor = normalize_bearer_actor(actor)?;
3041 let Some(token) = normalize_bearer_token(Some(token)) else {
3042 bail!("bearer token for actor '{actor}' must not be blank");
3043 };
3044 if !seen_actors.insert(actor.clone()) {
3045 bail!("duplicate bearer token actor '{actor}'");
3046 }
3047 if !seen_tokens.insert(token.clone()) {
3048 bail!("duplicate bearer token value configured");
3049 }
3050 normalized.push((actor, token));
3051 }
3052
3053 normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
3054 Ok(normalized)
3055}
3056
3057fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
3058 let mut entries = Vec::new();
3059
3060 if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
3061 {
3062 entries.push(("default".to_string(), token));
3063 }
3064
3065 if let Some(path) =
3066 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
3067 {
3068 entries.extend(read_bearer_tokens_file(&path)?);
3069 } else if let Some(json) =
3070 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
3071 {
3072 entries.extend(parse_bearer_tokens_json(&json)?);
3073 }
3074
3075 validate_bearer_tokens(entries)
3076}
3077
3078#[cfg(test)]
3079mod tests {
3080 use super::{
3081 GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
3082 classify_server_runtime_state, hash_bearer_token, load_server_settings,
3083 normalize_bearer_token, parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
3084 };
3085 use serial_test::serial;
3086 use std::env;
3087 use std::fs;
3088 use tempfile::tempdir;
3089
3090 #[test]
3094 fn authorize_splits_decision_from_operational_error() {
3095 use super::{Authz, PolicyAction, PolicyCompiler, PolicyConfig, PolicyRequest, ResolvedActor, authorize};
3096 use std::sync::Arc;
3097
3098 fn req(action: PolicyAction) -> PolicyRequest {
3099 PolicyRequest { action, branch: None, target_branch: None }
3100 }
3101 let actor = ResolvedActor::cluster_static(Arc::from("act-alice"));
3102
3103 assert!(matches!(
3106 authorize(Some(&actor), None, req(PolicyAction::GraphList)).unwrap(),
3107 Authz::Denied(_)
3108 ));
3109 assert!(matches!(
3111 authorize(Some(&actor), None, req(PolicyAction::Change)).unwrap(),
3112 Authz::Denied(_)
3113 ));
3114 assert!(matches!(
3116 authorize(Some(&actor), None, req(PolicyAction::Read)).unwrap(),
3117 Authz::Allowed
3118 ));
3119 assert!(matches!(
3121 authorize(None, None, req(PolicyAction::Read)).unwrap(),
3122 Authz::Allowed
3123 ));
3124
3125 let policy: PolicyConfig = serde_yaml::from_str(
3127 "version: 1\n\
3128 groups:\n team: [act-alice]\n\
3129 rules:\n - id: team-read\n allow:\n actors: { group: team }\n actions: [read]\n branch_scope: any\n",
3130 )
3131 .unwrap();
3132 let engine = PolicyCompiler::compile(&policy, "graph").unwrap();
3133
3134 assert!(matches!(
3136 authorize(
3137 Some(&actor),
3138 Some(&engine),
3139 PolicyRequest { action: PolicyAction::Read, branch: Some("main".to_string()), target_branch: None },
3140 )
3141 .unwrap(),
3142 Authz::Allowed
3143 ));
3144 match authorize(
3146 Some(&actor),
3147 Some(&engine),
3148 PolicyRequest { action: PolicyAction::Change, branch: Some("main".to_string()), target_branch: None },
3149 )
3150 .unwrap()
3151 {
3152 Authz::Denied(message) => assert!(!message.is_empty(), "a deny carries its decision message"),
3153 Authz::Allowed => panic!("change must be denied: only read is allowed"),
3154 }
3155 assert!(
3159 authorize(None, Some(&engine), req(PolicyAction::Read)).is_err(),
3160 "a missing actor with a policy installed is an operational error, not a deny"
3161 );
3162 }
3163
3164 #[test]
3165 fn hash_bearer_token_produces_32_byte_output() {
3166 let hash = hash_bearer_token("any-token");
3167 assert_eq!(hash.len(), 32);
3168 }
3169
3170 #[test]
3176 fn validate_and_attach_gates_on_schema_and_collapses_empty() {
3177 use crate::queries::{QueryRegistry, RegistrySpec};
3178 use omnigraph_compiler::catalog::build_catalog;
3179 use omnigraph_compiler::schema::parser::parse_schema;
3180
3181 let schema = parse_schema("node User {\nname: String\n}\n").unwrap();
3182 let catalog = build_catalog(&schema).unwrap();
3183 let spec = |name: &str, source: &str| RegistrySpec {
3184 name: name.to_string(),
3185 source: source.to_string(),
3186 expose: false,
3187 tool_name: None,
3188 };
3189
3190 let empty =
3192 super::validate_and_attach(QueryRegistry::default(), &catalog, "g").unwrap();
3193 assert!(empty.is_none());
3194
3195 let ok = QueryRegistry::from_specs(vec![spec(
3197 "find_user",
3198 "query find_user() { match { $u: User } return { $u.name } }",
3199 )])
3200 .unwrap();
3201 assert!(super::validate_and_attach(ok, &catalog, "g").unwrap().is_some());
3202
3203 let broken = QueryRegistry::from_specs(vec![spec(
3206 "ghost",
3207 "query ghost() { match { $w: Widget } return { $w.name } }",
3208 )])
3209 .unwrap();
3210 let err = super::validate_and_attach(broken, &catalog, "graph-x").unwrap_err();
3211 let msg = err.to_string();
3212 assert!(msg.contains("graph-x"), "labels the graph: {msg}");
3213 assert!(msg.contains("ghost"), "names the query: {msg}");
3214 assert!(msg.contains("schema check"), "mentions the schema check: {msg}");
3215 }
3216
3217 #[test]
3218 fn hash_bearer_token_is_deterministic() {
3219 assert_eq!(
3220 hash_bearer_token("stable-input"),
3221 hash_bearer_token("stable-input"),
3222 );
3223 }
3224
3225 #[test]
3226 fn hash_bearer_token_differs_for_different_inputs() {
3227 assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
3228 }
3229
3230 #[test]
3231 fn hash_bearer_token_matches_known_sha256_vector() {
3232 let hash = hash_bearer_token("abc");
3234 let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
3235 assert_eq!(
3236 hex,
3237 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
3238 );
3239 }
3240
3241 #[test]
3242 fn server_settings_load_from_yaml_config() {
3243 let temp = tempdir().unwrap();
3244 let config = temp.path().join("omnigraph.yaml");
3245 fs::write(
3246 &config,
3247 r#"
3248graphs:
3249 local:
3250 uri: /tmp/demo.omni
3251server:
3252 graph: local
3253 bind: 0.0.0.0:9090
3254"#,
3255 )
3256 .unwrap();
3257
3258 let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
3259 match &settings.mode {
3260 ServerConfigMode::Single { uri, graph_id, .. } => {
3261 assert_eq!(uri, "/tmp/demo.omni");
3262 assert_eq!(graph_id, "local");
3263 }
3264 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3265 }
3266 assert_eq!(settings.bind, "0.0.0.0:9090");
3267 }
3268
3269 #[test]
3270 fn server_settings_cli_flags_override_yaml_config() {
3271 let temp = tempdir().unwrap();
3272 let config = temp.path().join("omnigraph.yaml");
3273 fs::write(
3274 &config,
3275 r#"
3276graphs:
3277 local:
3278 uri: /tmp/demo.omni
3279server:
3280 graph: local
3281 bind: 127.0.0.1:8080
3282"#,
3283 )
3284 .unwrap();
3285
3286 let settings = load_server_settings(
3287 Some(&config),
3288 Some("/tmp/override.omni".to_string()),
3289 None,
3290 Some("0.0.0.0:9999".to_string()),
3291 false,
3292 )
3293 .unwrap();
3294 match &settings.mode {
3295 ServerConfigMode::Single { uri, graph_id, .. } => {
3296 assert_eq!(uri, "/tmp/override.omni");
3297 assert_eq!(graph_id, "/tmp/override.omni");
3298 }
3299 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3300 }
3301 assert_eq!(settings.bind, "0.0.0.0:9999");
3302 }
3303
3304 #[test]
3305 fn server_settings_can_resolve_named_target() {
3306 let temp = tempdir().unwrap();
3307 let config = temp.path().join("omnigraph.yaml");
3308 fs::write(
3309 &config,
3310 r#"
3311graphs:
3312 local:
3313 uri: ./demo.omni
3314 dev:
3315 uri: http://127.0.0.1:8080
3316server:
3317 graph: local
3318 bind: 127.0.0.1:8080
3319"#,
3320 )
3321 .unwrap();
3322
3323 let settings =
3324 load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
3325 .unwrap();
3326 match &settings.mode {
3327 ServerConfigMode::Single { uri, graph_id, .. } => {
3328 assert_eq!(uri, "http://127.0.0.1:8080");
3329 assert_eq!(graph_id, "dev");
3330 }
3331 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
3332 }
3333 }
3334
3335 #[test]
3336 fn server_settings_require_uri_from_cli_or_config() {
3337 let error = load_server_settings(None, None, None, None, false).unwrap_err();
3338 assert!(
3339 error.to_string().contains("no graph to serve"),
3340 "expected mode-inference error, got: {error}",
3341 );
3342 }
3343
3344 #[test]
3345 fn classify_open_requires_explicit_unauthenticated_flag() {
3346 let error = classify_server_runtime_state(false, false, false).unwrap_err();
3348 let msg = error.to_string();
3349 assert!(
3350 msg.contains("--unauthenticated"),
3351 "expected refusal message mentioning --unauthenticated, got: {msg}"
3352 );
3353
3354 assert_eq!(
3356 classify_server_runtime_state(false, false, true).unwrap(),
3357 ServerRuntimeState::Open
3358 );
3359 }
3360
3361 #[test]
3362 fn classify_tokens_without_policy_is_default_deny() {
3363 assert_eq!(
3367 classify_server_runtime_state(true, false, false).unwrap(),
3368 ServerRuntimeState::DefaultDeny
3369 );
3370 assert_eq!(
3371 classify_server_runtime_state(true, false, true).unwrap(),
3372 ServerRuntimeState::DefaultDeny
3373 );
3374 }
3375
3376 #[tokio::test]
3377 #[serial]
3378 async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
3379 let _guard = EnvGuard::set(&[
3388 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
3389 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
3390 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3391 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
3392 ("OMNIGRAPH_UNAUTHENTICATED", None),
3393 ]);
3394 let temp = tempdir().unwrap();
3395 let policy_path = temp.path().join("server-policy.yaml");
3400 let config = ServerConfig {
3401 mode: ServerConfigMode::Multi {
3402 graphs: vec![GraphStartupConfig {
3403 graph_id: "alpha".to_string(),
3404 uri: temp
3405 .path()
3406 .join("alpha.omni")
3407 .to_string_lossy()
3408 .into_owned(),
3409 policy_file: None,
3410 queries: crate::queries::QueryRegistry::default(),
3411 }],
3412 config_path: temp.path().join("omnigraph.yaml"),
3413 server_policy_file: Some(policy_path),
3414 },
3415 bind: "127.0.0.1:0".to_string(),
3416 allow_unauthenticated: false,
3417 };
3418 let result = serve(config).await;
3419 let err = result
3420 .expect_err("serve should refuse to start in multi mode with policy but no tokens");
3421 let msg = format!("{:?}", err);
3422 assert!(
3423 msg.contains("policy file is configured but no bearer tokens"),
3424 "expected policy-without-tokens rejection in multi mode, got: {msg}",
3425 );
3426 }
3427
3428 #[tokio::test]
3429 #[serial]
3430 async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
3431 let _guard = EnvGuard::set(&[
3442 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
3443 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
3444 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3445 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
3446 ("OMNIGRAPH_UNAUTHENTICATED", None),
3447 ]);
3448 let temp = tempdir().unwrap();
3449 let config = ServerConfig {
3452 mode: ServerConfigMode::Single {
3453 uri: temp
3454 .path()
3455 .join("graph.omni")
3456 .to_string_lossy()
3457 .into_owned(),
3458 graph_id: "default".to_string(),
3459 policy_file: None,
3460 queries: crate::queries::QueryRegistry::default(),
3461 },
3462 bind: "127.0.0.1:0".to_string(),
3463 allow_unauthenticated: false,
3464 };
3465 let result = serve(config).await;
3466 let err =
3467 result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
3468 let msg = format!("{:?}", err);
3469 assert!(
3470 msg.contains("no bearer tokens") || msg.contains("policy file"),
3471 "expected refusal message naming the misconfiguration, got: {msg}",
3472 );
3473 }
3474
3475 #[test]
3476 #[serial]
3477 fn unauthenticated_env_var_classification() {
3478 let temp = tempdir().unwrap();
3488 let config_path = temp.path().join("omnigraph.yaml");
3489 fs::write(
3490 &config_path,
3491 r#"
3492graphs:
3493 local:
3494 uri: /tmp/demo-unauth.omni
3495server:
3496 graph: local
3497"#,
3498 )
3499 .unwrap();
3500
3501 for value in ["1", "true", "yes", "TRUE", "anything"] {
3503 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
3504 let settings = load_server_settings(Some(&config_path), None, None, None, false)
3505 .expect("settings load should succeed");
3506 assert!(
3507 settings.allow_unauthenticated,
3508 "OMNIGRAPH_UNAUTHENTICATED={value:?} should enable Open mode",
3509 );
3510 }
3511
3512 for value in ["0", "false", "FALSE", ""] {
3514 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
3515 let settings = load_server_settings(Some(&config_path), None, None, None, false)
3516 .expect("settings load should succeed");
3517 assert!(
3518 !settings.allow_unauthenticated,
3519 "OMNIGRAPH_UNAUTHENTICATED={value:?} should NOT enable Open mode",
3520 );
3521 }
3522
3523 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
3525 let settings = load_server_settings(Some(&config_path), None, None, None, false)
3526 .expect("settings load should succeed");
3527 assert!(
3528 !settings.allow_unauthenticated,
3529 "OMNIGRAPH_UNAUTHENTICATED unset should NOT enable Open mode",
3530 );
3531 drop(_guard);
3532
3533 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
3536 let settings = load_server_settings(Some(&config_path), None, None, None, true)
3537 .expect("settings load should succeed");
3538 assert!(
3539 settings.allow_unauthenticated,
3540 "--unauthenticated CLI flag should win even when env is falsy",
3541 );
3542 }
3543
3544 #[test]
3545 fn classify_policy_enabled_requires_tokens() {
3546 assert_eq!(
3550 classify_server_runtime_state(true, true, false).unwrap(),
3551 ServerRuntimeState::PolicyEnabled
3552 );
3553 assert_eq!(
3554 classify_server_runtime_state(true, true, true).unwrap(),
3555 ServerRuntimeState::PolicyEnabled
3556 );
3557 }
3558
3559 #[test]
3560 fn classify_policy_without_tokens_is_rejected() {
3561 for allow_unauthenticated in [false, true] {
3568 let err =
3569 classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
3570 let msg = err.to_string();
3571 assert!(
3572 msg.contains("policy file is configured but no bearer tokens"),
3573 "expected policy-without-tokens rejection message; got: {msg}"
3574 );
3575 assert!(
3576 msg.contains("every request would 401"),
3577 "rejection message must name the failure mode; got: {msg}"
3578 );
3579 }
3580 }
3581
3582 #[test]
3583 fn normalize_bearer_token_trims_and_filters_blank_values() {
3584 assert_eq!(normalize_bearer_token(None), None);
3585 assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
3586 assert_eq!(
3587 normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
3588 Some("demo-token")
3589 );
3590 }
3591
3592 struct EnvGuard {
3593 saved: Vec<(&'static str, Option<String>)>,
3594 }
3595
3596 impl EnvGuard {
3597 fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
3598 let saved = vars
3599 .iter()
3600 .map(|(name, _)| (*name, env::var(name).ok()))
3601 .collect::<Vec<_>>();
3602 for (name, value) in vars {
3603 unsafe {
3604 match value {
3605 Some(value) => env::set_var(name, value),
3606 None => env::remove_var(name),
3607 }
3608 }
3609 }
3610 Self { saved }
3611 }
3612 }
3613
3614 impl Drop for EnvGuard {
3615 fn drop(&mut self) {
3616 for (name, value) in self.saved.drain(..) {
3617 unsafe {
3618 match value {
3619 Some(value) => env::set_var(name, value),
3620 None => env::remove_var(name),
3621 }
3622 }
3623 }
3624 }
3625 }
3626
3627 #[test]
3628 fn parse_bearer_tokens_json_reads_actor_token_map() {
3629 let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
3630 assert_eq!(tokens.len(), 2);
3631 assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
3632 assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
3633 }
3634
3635 #[test]
3636 #[serial]
3637 fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
3638 let temp = tempdir().unwrap();
3639 let tokens_path = temp.path().join("tokens.json");
3640 fs::write(
3641 &tokens_path,
3642 r#"{"team-01":"token-one","team-02":"token-two"}"#,
3643 )
3644 .unwrap();
3645
3646 let _guard = EnvGuard::set(&[
3647 ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
3648 (
3649 "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
3650 Some(tokens_path.to_str().unwrap()),
3651 ),
3652 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3653 ]);
3654
3655 let tokens = server_bearer_tokens_from_env().unwrap();
3656 assert_eq!(
3657 tokens,
3658 vec![
3659 ("default".to_string(), "legacy-token".to_string()),
3660 ("team-01".to_string(), "token-one".to_string()),
3661 ("team-02".to_string(), "token-two".to_string()),
3662 ]
3663 );
3664 }
3665}