1pub mod api;
2pub mod auth;
3pub mod config;
4pub mod graph_id;
5pub mod identity;
6pub mod policy;
7pub mod registry;
8pub mod workload;
9
10pub use graph_id::GraphId;
11pub use identity::{AuthSource, GraphKey, ResolvedActor, Scope, TenantId};
12pub use registry::{GraphHandle, GraphRegistry, InsertError, RegistryLookup, RegistrySnapshot};
13
14use std::collections::{HashMap, HashSet};
15use std::fs;
16use std::io;
17use std::io::Write;
18use std::path::PathBuf;
19use std::sync::Arc;
20
21use api::{
22 BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
23 BranchMergeOutput, BranchMergeRequest, ChangeOutput, ChangeRequest, CommitListOutput,
24 CommitListQuery, ErrorCode, ErrorOutput, ExportRequest, GraphInfo, GraphListResponse,
25 HealthOutput, IngestOutput, IngestRequest, QueryRequest, ReadOutput, ReadRequest,
26 SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotQuery, ingest_output,
27 schema_apply_output, snapshot_payload,
28};
29pub use auth::{AWS_SECRET_ENV, EnvOrFileTokenSource, TokenSource, resolve_token_source};
30use axum::body::{Body, Bytes};
31use axum::extract::DefaultBodyLimit;
32use axum::extract::{Extension, OriginalUri, Path, Query, Request, State};
33use axum::http::StatusCode;
34use axum::http::header::{AUTHORIZATION, CONTENT_TYPE, HeaderName, HeaderValue};
35use axum::middleware::{self, Next};
36use axum::response::{IntoResponse, Response};
37use axum::routing::{delete, get, post};
38use axum::{Json, Router};
39use color_eyre::eyre::{Result, WrapErr, bail};
40pub use config::{
41 AliasCommand, AliasConfig, CliDefaults, DEFAULT_CONFIG_FILE, OmnigraphConfig, PolicySettings,
42 ProjectConfig, QueryDefaults, ReadOutputFormat, ServerDefaults, TableCellLayout, TargetConfig,
43 load_config,
44};
45use futures::stream;
46use omnigraph::db::{Omnigraph, ReadTarget};
47use omnigraph::error::{ManifestConflictDetails, ManifestErrorKind, OmniError};
48use omnigraph::storage::normalize_root_uri;
49use omnigraph_compiler::json_params_to_param_map;
50use omnigraph_compiler::query::parser::parse_query;
51use omnigraph_compiler::{JsonParamMode, ParamMap};
52pub use policy::{
53 PolicyAction, PolicyCompiler, PolicyConfig, PolicyDecision, PolicyEngine, PolicyExpectation,
54 PolicyRequest, PolicyResourceKind, PolicyTestConfig,
55};
56use serde::Deserialize;
57use serde_json::Value;
58use sha2::{Digest, Sha256};
59use subtle::ConstantTimeEq;
60use tokio::net::TcpListener;
61use tokio::sync::mpsc;
62use tower_http::trace::TraceLayer;
63use tracing::{error, info, warn};
64use tracing_subscriber::EnvFilter;
65use utoipa::OpenApi;
66use utoipa::openapi::path::{Parameter, ParameterIn};
67use utoipa::openapi::schema::{Object, Type};
68use utoipa::openapi::security::{Http, HttpAuthScheme, SecurityScheme};
69
70type BearerTokenHash = [u8; 32];
71
72fn hash_bearer_token(token: &str) -> BearerTokenHash {
73 let digest = Sha256::digest(token.as_bytes());
74 let mut out = [0u8; 32];
75 out.copy_from_slice(&digest);
76 out
77}
78
79#[derive(OpenApi)]
80#[openapi(
81 info(
82 title = "Omnigraph API",
83 description = "HTTP API for the Omnigraph graph database",
84 ),
85 paths(
86 server_health,
87 server_graphs_list,
88 server_snapshot,
89 #[allow(deprecated)] server_read,
92 server_query,
93 server_export,
94 #[allow(deprecated)] server_change,
95 server_mutate,
96 server_schema_apply,
97 server_schema_get,
98 server_ingest,
99 server_branch_list,
100 server_branch_create,
101 server_branch_delete,
102 server_branch_merge,
103 server_commit_list,
104 server_commit_show,
105 ),
106 modifiers(&SecurityAddon),
107)]
108pub struct ApiDoc;
109
110struct SecurityAddon;
111
112impl utoipa::Modify for SecurityAddon {
113 fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
114 openapi
115 .components
116 .get_or_insert_with(Default::default)
117 .add_security_scheme(
118 "bearer_token",
119 SecurityScheme::Http(Http::new(HttpAuthScheme::Bearer)),
120 );
121 }
122}
123
124const DEFAULT_REQUEST_BODY_LIMIT_BYTES: usize = 1_048_576;
125const INGEST_REQUEST_BODY_LIMIT_BYTES: usize = 32 * 1024 * 1024;
126const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
127const SERVER_SOURCE_VERSION: Option<&str> = option_env!("OMNIGRAPH_SOURCE_VERSION");
128
129#[derive(Debug, Clone)]
130pub struct ServerConfig {
131 pub mode: ServerConfigMode,
137 pub bind: String,
138 pub allow_unauthenticated: bool,
147}
148
149#[derive(Debug, Clone)]
152pub enum ServerConfigMode {
153 Single {
159 uri: String,
160 policy_file: Option<PathBuf>,
162 },
163 Multi {
166 graphs: Vec<GraphStartupConfig>,
169 config_path: PathBuf,
174 server_policy_file: Option<PathBuf>,
177 },
178}
179
180#[derive(Debug, Clone)]
184pub struct GraphStartupConfig {
185 pub graph_id: String,
186 pub uri: String,
187 pub policy_file: Option<PathBuf>,
188}
189
190#[derive(Clone)]
205pub enum GraphRouting {
206 Single { handle: Arc<GraphHandle> },
210 Multi {
217 registry: Arc<GraphRegistry>,
218 config_path: Option<PathBuf>,
219 },
220}
221
222#[derive(Clone)]
223pub struct AppState {
224 routing: GraphRouting,
231 workload: Arc<workload::WorkloadController>,
234 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
235 server_policy: Option<Arc<PolicyEngine>>,
242}
243
244struct ExportStreamWriter {
245 sender: mpsc::UnboundedSender<std::result::Result<Bytes, io::Error>>,
246}
247
248impl Write for ExportStreamWriter {
249 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
250 self.sender
251 .send(Ok(Bytes::copy_from_slice(buf)))
252 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "export stream closed"))?;
253 Ok(buf.len())
254 }
255
256 fn flush(&mut self) -> io::Result<()> {
257 Ok(())
258 }
259}
260
261#[derive(Debug)]
262pub struct ApiError {
263 status: StatusCode,
264 code: ErrorCode,
265 message: String,
266 merge_conflicts: Vec<api::MergeConflictOutput>,
267 manifest_conflict: Option<api::ManifestConflictOutput>,
268}
269
270impl AppState {
271 pub fn new_single(
280 uri: String,
281 db: Omnigraph,
282 bearer_tokens: Vec<(String, String)>,
283 policy_engine: Option<PolicyEngine>,
284 workload: workload::WorkloadController,
285 ) -> Self {
286 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
287 let per_graph_policy = policy_engine.map(Arc::new);
288 Self::build_single_mode(uri, db, bearer_tokens, per_graph_policy, Arc::new(workload))
289 }
290
291 pub fn new(uri: String, db: Omnigraph) -> Self {
292 Self::new_single(
293 uri,
294 db,
295 Vec::new(),
296 None,
297 workload::WorkloadController::from_env(),
298 )
299 }
300
301 pub fn new_with_bearer_token(uri: String, db: Omnigraph, bearer_token: Option<String>) -> Self {
302 let bearer_tokens = normalize_bearer_token(bearer_token)
303 .into_iter()
304 .map(|token| ("default".to_string(), token))
305 .collect();
306 Self::new_with_bearer_tokens(uri, db, bearer_tokens)
307 }
308
309 pub fn new_with_bearer_tokens(
310 uri: String,
311 db: Omnigraph,
312 bearer_tokens: Vec<(String, String)>,
313 ) -> Self {
314 Self::new_single(
315 uri,
316 db,
317 bearer_tokens,
318 None,
319 workload::WorkloadController::from_env(),
320 )
321 }
322
323 pub fn new_with_bearer_tokens_and_policy(
324 uri: String,
325 db: Omnigraph,
326 bearer_tokens: Vec<(String, String)>,
327 policy_engine: Option<PolicyEngine>,
328 ) -> Self {
329 Self::new_single(
330 uri,
331 db,
332 bearer_tokens,
333 policy_engine,
334 workload::WorkloadController::from_env(),
335 )
336 }
337
338 pub fn new_with_workload(
344 uri: String,
345 db: Omnigraph,
346 bearer_tokens: Vec<(String, String)>,
347 workload: workload::WorkloadController,
348 ) -> Self {
349 Self::new_single(uri, db, bearer_tokens, None, workload)
350 }
351
352 pub async fn open(uri: impl Into<String>) -> Result<Self> {
353 Self::open_with_bearer_token(uri, None).await
354 }
355
356 pub async fn open_with_bearer_token(
357 uri: impl Into<String>,
358 bearer_token: Option<String>,
359 ) -> Result<Self> {
360 let bearer_tokens = normalize_bearer_token(bearer_token)
361 .into_iter()
362 .map(|token| ("default".to_string(), token))
363 .collect();
364 Self::open_with_bearer_tokens(uri, bearer_tokens).await
365 }
366
367 pub async fn open_with_bearer_tokens(
368 uri: impl Into<String>,
369 bearer_tokens: Vec<(String, String)>,
370 ) -> Result<Self> {
371 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
372 let db = Omnigraph::open(&uri).await?;
373 Ok(Self::new_with_bearer_tokens(uri, db, bearer_tokens))
374 }
375
376 pub async fn open_with_bearer_tokens_and_policy(
377 uri: impl Into<String>,
378 bearer_tokens: Vec<(String, String)>,
379 policy_file: Option<&PathBuf>,
380 ) -> Result<Self> {
381 let uri = normalize_root_uri(&uri.into()).wrap_err("normalize graph URI")?;
387 let db = Omnigraph::open(&uri).await?;
388 let policy_engine = match policy_file {
389 Some(path) => Some(PolicyEngine::load_graph(path, &uri)?),
390 None => None,
391 };
392 Ok(Self::new_with_bearer_tokens_and_policy(
393 uri,
394 db,
395 bearer_tokens,
396 policy_engine,
397 ))
398 }
399
400 fn build_single_mode(
406 uri: String,
407 db: Omnigraph,
408 bearer_tokens: Arc<[(BearerTokenHash, Arc<str>)]>,
409 policy_engine: Option<Arc<PolicyEngine>>,
410 workload: Arc<workload::WorkloadController>,
411 ) -> Self {
412 let db = if let Some(policy) = policy_engine.as_ref() {
417 let checker = Arc::clone(policy) as Arc<dyn omnigraph_policy::PolicyChecker>;
418 db.with_policy(checker)
419 } else {
420 db
421 };
422 let uri = normalize_root_uri(&uri).unwrap_or(uri);
431 let key = GraphKey::cluster(
432 GraphId::try_from("default").expect("'default' is a valid GraphId log label"),
433 );
434 let handle = Arc::new(GraphHandle {
435 key,
436 uri,
437 engine: Arc::new(db),
438 policy: policy_engine,
439 });
440 Self {
441 routing: GraphRouting::Single { handle },
442 workload,
443 bearer_tokens,
444 server_policy: None,
445 }
446 }
447
448 pub fn new_multi(
456 handles: Vec<Arc<GraphHandle>>,
457 bearer_tokens: Vec<(String, String)>,
458 server_policy: Option<PolicyEngine>,
459 workload: workload::WorkloadController,
460 config_path: Option<PathBuf>,
461 ) -> std::result::Result<Self, InsertError> {
462 let bearer_tokens = hash_bearer_tokens(bearer_tokens);
463 let registry = Arc::new(GraphRegistry::from_handles(handles)?);
464 Ok(Self {
465 routing: GraphRouting::Multi {
466 registry,
467 config_path,
468 },
469 workload: Arc::new(workload),
470 bearer_tokens,
471 server_policy: server_policy.map(Arc::new),
472 })
473 }
474
475 pub fn routing(&self) -> &GraphRouting {
481 &self.routing
482 }
483
484 fn requires_bearer_auth(&self) -> bool {
485 if !self.bearer_tokens.is_empty() {
486 return true;
487 }
488 if self.server_policy.is_some() {
489 return true;
490 }
491 match &self.routing {
497 GraphRouting::Single { handle } => handle.policy.is_some(),
498 GraphRouting::Multi { registry, .. } => registry.snapshot_ref().any_per_graph_policy,
499 }
500 }
501
502 fn authenticate_bearer_token(&self, provided_token: &str) -> Option<ResolvedActor> {
503 let provided_hash = hash_bearer_token(provided_token);
507 let mut matched: Option<Arc<str>> = None;
508 for (hash, actor) in self.bearer_tokens.iter() {
509 if bool::from(hash.ct_eq(&provided_hash)) && matched.is_none() {
510 matched = Some(Arc::clone(actor));
511 }
512 }
513 matched.map(ResolvedActor::cluster_static)
514 }
515}
516
517fn hash_bearer_tokens(bearer_tokens: Vec<(String, String)>) -> Arc<[(BearerTokenHash, Arc<str>)]> {
518 let tokens: Vec<(BearerTokenHash, Arc<str>)> = bearer_tokens
519 .into_iter()
520 .map(|(actor, token)| (hash_bearer_token(&token), Arc::<str>::from(actor)))
521 .collect();
522 Arc::from(tokens)
523}
524
525impl ApiError {
526 pub fn unauthorized(message: impl Into<String>) -> Self {
527 Self {
528 status: StatusCode::UNAUTHORIZED,
529 code: ErrorCode::Unauthorized,
530 message: message.into(),
531 merge_conflicts: Vec::new(),
532 manifest_conflict: None,
533 }
534 }
535
536 pub fn forbidden(message: impl Into<String>) -> Self {
537 Self {
538 status: StatusCode::FORBIDDEN,
539 code: ErrorCode::Forbidden,
540 message: message.into(),
541 merge_conflicts: Vec::new(),
542 manifest_conflict: None,
543 }
544 }
545
546 pub fn bad_request(message: impl Into<String>) -> Self {
547 Self {
548 status: StatusCode::BAD_REQUEST,
549 code: ErrorCode::BadRequest,
550 message: message.into(),
551 merge_conflicts: Vec::new(),
552 manifest_conflict: None,
553 }
554 }
555
556 pub fn not_found(message: impl Into<String>) -> Self {
557 Self {
558 status: StatusCode::NOT_FOUND,
559 code: ErrorCode::NotFound,
560 message: message.into(),
561 merge_conflicts: Vec::new(),
562 manifest_conflict: None,
563 }
564 }
565
566 pub fn method_not_allowed(message: impl Into<String>) -> Self {
571 Self {
572 status: StatusCode::METHOD_NOT_ALLOWED,
573 code: ErrorCode::MethodNotAllowed,
574 message: message.into(),
575 merge_conflicts: Vec::new(),
576 manifest_conflict: None,
577 }
578 }
579
580 pub fn conflict(message: impl Into<String>) -> Self {
581 Self {
582 status: StatusCode::CONFLICT,
583 code: ErrorCode::Conflict,
584 message: message.into(),
585 merge_conflicts: Vec::new(),
586 manifest_conflict: None,
587 }
588 }
589
590 pub fn internal(message: impl Into<String>) -> Self {
591 Self {
592 status: StatusCode::INTERNAL_SERVER_ERROR,
593 code: ErrorCode::Internal,
594 message: message.into(),
595 merge_conflicts: Vec::new(),
596 manifest_conflict: None,
597 }
598 }
599
600 pub fn too_many_requests(message: impl Into<String>) -> Self {
605 Self {
606 status: StatusCode::TOO_MANY_REQUESTS,
607 code: ErrorCode::TooManyRequests,
608 message: message.into(),
609 merge_conflicts: Vec::new(),
610 manifest_conflict: None,
611 }
612 }
613
614 pub fn from_workload_reject(reject: workload::RejectReason) -> Self {
617 match reject {
618 workload::RejectReason::InFlightCountExceeded { .. }
619 | workload::RejectReason::ByteBudgetExceeded { .. } => {
620 Self::too_many_requests(reject.to_string())
621 }
622 }
623 }
624
625 fn merge_conflict(conflicts: Vec<api::MergeConflictOutput>) -> Self {
626 Self {
627 status: StatusCode::CONFLICT,
628 code: ErrorCode::Conflict,
629 message: summarize_merge_conflicts(&conflicts),
630 merge_conflicts: conflicts,
631 manifest_conflict: None,
632 }
633 }
634
635 fn manifest_version_conflict(message: String, details: api::ManifestConflictOutput) -> Self {
636 Self {
637 status: StatusCode::CONFLICT,
638 code: ErrorCode::Conflict,
639 message,
640 merge_conflicts: Vec::new(),
641 manifest_conflict: Some(details),
642 }
643 }
644
645 fn from_omni(err: OmniError) -> Self {
646 match err {
647 OmniError::Compiler(err) => Self::bad_request(err.to_string()),
648 OmniError::DataFusion(message) => Self::bad_request(format!("query: {message}")),
649 OmniError::Manifest(err) => match err.kind {
650 ManifestErrorKind::BadRequest => Self::bad_request(err.message),
651 ManifestErrorKind::NotFound => Self::not_found(err.message),
652 ManifestErrorKind::Conflict => match err.details {
653 Some(ManifestConflictDetails::ExpectedVersionMismatch {
654 table_key,
655 expected,
656 actual,
657 }) => Self::manifest_version_conflict(
658 err.message,
659 api::ManifestConflictOutput {
660 table_key,
661 expected,
662 actual,
663 },
664 ),
665 _ => Self::conflict(err.message),
666 },
667 ManifestErrorKind::Internal => Self::internal(err.message),
668 },
669 OmniError::MergeConflicts(conflicts) => Self::merge_conflict(
670 conflicts
671 .iter()
672 .map(api::MergeConflictOutput::from)
673 .collect(),
674 ),
675 OmniError::Lance(message) => Self::internal(format!("storage: {message}")),
676 OmniError::Io(err) => Self::internal(format!("io: {err}")),
677 OmniError::Policy(message) => Self::forbidden(message),
684 err @ OmniError::AlreadyInitialized { .. } => Self::conflict(err.to_string()),
690 }
691 }
692}
693
694fn summarize_merge_conflicts(conflicts: &[api::MergeConflictOutput]) -> String {
695 if conflicts.is_empty() {
696 return "merge conflicts".to_string();
697 }
698
699 let preview: Vec<String> = conflicts
700 .iter()
701 .take(3)
702 .map(|conflict| match conflict.row_id.as_deref() {
703 Some(row_id) => format!(
704 "{}:{} ({})",
705 conflict.table_key,
706 row_id,
707 conflict.kind.as_str()
708 ),
709 None => format!("{} ({})", conflict.table_key, conflict.kind.as_str()),
710 })
711 .collect();
712
713 let suffix = if conflicts.len() > preview.len() {
714 format!("; and {} more", conflicts.len() - preview.len())
715 } else {
716 String::new()
717 };
718
719 format!("merge conflicts: {}{}", preview.join("; "), suffix)
720}
721
722const RETRY_AFTER_SECONDS: &str = "60";
724
725impl IntoResponse for ApiError {
726 fn into_response(self) -> Response {
727 let mut headers = axum::http::HeaderMap::new();
728 if matches!(self.code, ErrorCode::TooManyRequests) {
729 headers.insert(
730 axum::http::header::RETRY_AFTER,
731 axum::http::HeaderValue::from_static(RETRY_AFTER_SECONDS),
732 );
733 }
734 (
735 self.status,
736 headers,
737 Json(ErrorOutput {
738 error: self.message,
739 code: Some(self.code),
740 merge_conflicts: self.merge_conflicts,
741 manifest_conflict: self.manifest_conflict,
742 }),
743 )
744 .into_response()
745 }
746}
747
748pub fn init_tracing() {
749 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
750 let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
751}
752
753pub fn load_server_settings(
754 config_path: Option<&PathBuf>,
755 cli_uri: Option<String>,
756 cli_target: Option<String>,
757 cli_bind: Option<String>,
758 cli_allow_unauthenticated: bool,
759) -> Result<ServerConfig> {
760 let config = load_config(config_path)?;
761 let bind = cli_bind.unwrap_or_else(|| config.server_bind().to_string());
762 let env_unauth = std::env::var("OMNIGRAPH_UNAUTHENTICATED")
766 .ok()
767 .map(|v| {
768 let trimmed = v.trim();
769 !trimmed.is_empty() && trimmed != "0" && !trimmed.eq_ignore_ascii_case("false")
770 })
771 .unwrap_or(false);
772 let allow_unauthenticated = cli_allow_unauthenticated || env_unauth;
773
774 let has_cli_uri = cli_uri.is_some();
787 let has_cli_target = cli_target.is_some();
788 let has_server_graph = config.server_graph_name().is_some();
789 let has_graphs_map = !config.graphs.is_empty();
790 let has_explicit_config = config_path.is_some();
791
792 let mode = if has_cli_uri || has_cli_target || has_server_graph {
793 let raw_uri = config.resolve_target_uri(
795 cli_uri,
796 cli_target.as_deref(),
797 config.server_graph_name(),
798 )?;
799 let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
800 format!("normalize single-graph URI '{raw_uri}' from server settings")
801 })?;
802 let policy_file = config.resolve_policy_file();
803 ServerConfigMode::Single { uri, policy_file }
804 } else if has_explicit_config && has_graphs_map {
805 if config.resolve_policy_file().is_some() {
806 bail!(
807 "top-level `policy.file` is single-graph/CLI-local policy only; \
808 in multi-graph mode move per-graph rules to \
809 `graphs.<graph_id>.policy.file` and move `graph_list` rules to \
810 `server.policy.file`."
811 );
812 }
813 let mut graphs = Vec::with_capacity(config.graphs.len());
815 for (name, target) in &config.graphs {
816 GraphId::try_from(name.clone()).map_err(|err| {
820 color_eyre::eyre::eyre!("invalid graph id '{name}' in omnigraph.yaml: {err}")
821 })?;
822 let raw_uri = config.resolve_uri_value(&target.uri);
823 let uri = normalize_root_uri(&raw_uri).wrap_err_with(|| {
824 format!("normalize URI '{raw_uri}' for graph '{name}' in omnigraph.yaml")
825 })?;
826 graphs.push(GraphStartupConfig {
827 graph_id: name.clone(),
828 uri,
829 policy_file: config.resolve_target_policy_file(name),
830 });
831 }
832 let config_path = config_path
833 .cloned()
834 .expect("has_explicit_config implies config_path is Some");
835 let server_policy_file = config.resolve_server_policy_file();
836 ServerConfigMode::Multi {
837 graphs,
838 config_path,
839 server_policy_file,
840 }
841 } else {
842 bail!(
844 "no graph to serve: pass a URI (`omnigraph-server <URI>`), select a target \
845 (`--target <name> --config omnigraph.yaml`), set `server.graph: <name>` in \
846 omnigraph.yaml, or for multi-graph mode add a `graphs:` map to the config \
847 file referenced by `--config`."
848 );
849 };
850
851 Ok(ServerConfig {
852 mode,
853 bind,
854 allow_unauthenticated,
855 })
856}
857
858pub fn server_config_is_multi(config: &ServerConfig) -> bool {
861 matches!(config.mode, ServerConfigMode::Multi { .. })
862}
863
864#[derive(Debug, Clone, Copy, Eq, PartialEq)]
885pub enum ServerRuntimeState {
886 Open,
887 DefaultDeny,
888 PolicyEnabled,
889}
890
891pub fn classify_server_runtime_state(
902 has_tokens: bool,
903 has_policy: bool,
904 allow_unauthenticated: bool,
905) -> Result<ServerRuntimeState> {
906 match (has_tokens, has_policy, allow_unauthenticated) {
907 (false, false, false) => bail!(
908 "server has no bearer tokens and no policy file configured. This is a fully \
909 open server — pass `--unauthenticated` (or set OMNIGRAPH_UNAUTHENTICATED=1) \
910 if you actually want that, otherwise configure bearer tokens (see \
911 docs/user/server.md) and/or `policy.file` in omnigraph.yaml."
912 ),
913 (false, false, true) => Ok(ServerRuntimeState::Open),
914 (true, false, _) => Ok(ServerRuntimeState::DefaultDeny),
915 (false, true, _) => bail!(
916 "policy file is configured but no bearer tokens — every request would 401 \
917 because no token can ever match. Configure at least one bearer token (see \
918 docs/user/server.md), or remove the policy file. To deny all unauthenticated \
919 traffic deliberately, configure tokens plus a deny-all Cedar rule — that \
920 produces meaningful 403s with policy-decision logging instead of silent 401s."
921 ),
922 (true, true, _) => Ok(ServerRuntimeState::PolicyEnabled),
923 }
924}
925
926pub fn build_app(state: AppState) -> Router {
927 let per_graph_protected = Router::new()
935 .route("/snapshot", get(server_snapshot))
936 .route("/export", post(server_export))
937 .route("/read", post({
943 #[allow(deprecated)]
944 server_read
945 }))
946 .route("/query", post(server_query))
947 .route("/change", post({
948 #[allow(deprecated)]
949 server_change
950 }))
951 .route("/mutate", post(server_mutate))
952 .route("/schema", get(server_schema_get))
953 .route("/schema/apply", post(server_schema_apply))
954 .route(
955 "/ingest",
956 post(server_ingest).layer(DefaultBodyLimit::max(INGEST_REQUEST_BODY_LIMIT_BYTES)),
957 )
958 .route(
959 "/branches",
960 get(server_branch_list).post(server_branch_create),
961 )
962 .route("/branches/{branch}", delete(server_branch_delete))
963 .route("/branches/merge", post(server_branch_merge))
964 .route("/commits", get(server_commit_list))
965 .route("/commits/{commit_id}", get(server_commit_show))
966 .route_layer(middleware::from_fn_with_state(
967 state.clone(),
968 resolve_graph_handle,
969 ))
970 .route_layer(middleware::from_fn_with_state(
971 state.clone(),
972 require_bearer_auth,
973 ));
974
975 let management = Router::new()
986 .route("/graphs", get(server_graphs_list))
987 .route_layer(middleware::from_fn_with_state(
988 state.clone(),
989 require_bearer_auth,
990 ));
991
992 let protected: Router<AppState> = match state.routing() {
996 GraphRouting::Single { .. } => per_graph_protected.merge(management),
997 GraphRouting::Multi { .. } => Router::new()
998 .nest("/graphs/{graph_id}", per_graph_protected)
999 .merge(management),
1000 };
1001
1002 Router::new()
1003 .route("/healthz", get(server_health))
1004 .route("/openapi.json", get(server_openapi))
1005 .merge(protected)
1006 .layer(DefaultBodyLimit::max(DEFAULT_REQUEST_BODY_LIMIT_BYTES))
1007 .layer(TraceLayer::new_for_http())
1008 .with_state(state)
1009}
1010
1011pub async fn serve(config: ServerConfig) -> Result<()> {
1012 let token_source = resolve_token_source().await?;
1013 info!(source = token_source.name(), "loaded bearer token source");
1014 let tokens = token_source.load().await?;
1015
1016 let has_policy_configured = match &config.mode {
1021 ServerConfigMode::Single { policy_file, .. } => policy_file.is_some(),
1022 ServerConfigMode::Multi {
1023 graphs,
1024 server_policy_file,
1025 ..
1026 } => server_policy_file.is_some() || graphs.iter().any(|g| g.policy_file.is_some()),
1027 };
1028 let runtime_state = classify_server_runtime_state(
1029 !tokens.is_empty(),
1030 has_policy_configured,
1031 config.allow_unauthenticated,
1032 )?;
1033 match runtime_state {
1034 ServerRuntimeState::Open => warn!(
1035 "running with --unauthenticated: no bearer tokens, no policy file, all \
1036 requests permitted. This is for local dev only — do not expose to a \
1037 network you don't fully trust."
1038 ),
1039 ServerRuntimeState::DefaultDeny => warn!(
1040 "bearer tokens are configured but no policy file is set — running in \
1041 default-deny mode (only `read` actions are permitted for authenticated \
1042 actors). Configure `policy.file` in omnigraph.yaml to enable Cedar rules."
1043 ),
1044 ServerRuntimeState::PolicyEnabled => {}
1045 }
1046
1047 let bind = config.bind.clone();
1048 let state = match config.mode {
1049 ServerConfigMode::Single { uri, policy_file } => {
1050 let uri_for_log = uri.clone();
1051 info!(uri = %uri_for_log, bind = %bind, mode = "single", "serving omnigraph");
1052 AppState::open_with_bearer_tokens_and_policy(uri, tokens, policy_file.as_ref()).await?
1053 }
1054 ServerConfigMode::Multi {
1055 graphs,
1056 config_path,
1057 server_policy_file,
1058 } => {
1059 info!(
1060 bind = %bind,
1061 mode = "multi",
1062 graph_count = graphs.len(),
1063 config = %config_path.display(),
1064 "serving omnigraph"
1065 );
1066 open_multi_graph_state(graphs, tokens, server_policy_file.as_ref(), config_path).await?
1067 }
1068 };
1069
1070 let listener = TcpListener::bind(&bind).await?;
1071 axum::serve(listener, build_app(state))
1072 .with_graceful_shutdown(shutdown_signal())
1073 .await?;
1074 Ok(())
1075}
1076
1077async fn open_multi_graph_state(
1086 graphs: Vec<GraphStartupConfig>,
1087 tokens: Vec<(String, String)>,
1088 server_policy_file: Option<&PathBuf>,
1089 config_path: PathBuf,
1090) -> Result<AppState> {
1091 use futures::{StreamExt, TryStreamExt};
1092
1093 if graphs.is_empty() {
1094 bail!("multi-graph mode requires at least one graph in the `graphs:` map");
1095 }
1096
1097 let server_policy = match server_policy_file {
1102 Some(path) => Some(PolicyEngine::load_server(path)?),
1103 None => None,
1104 };
1105
1106 let handles: Vec<Arc<GraphHandle>> = futures::stream::iter(graphs.into_iter())
1111 .map(|cfg| async move { open_single_graph(cfg).await })
1112 .buffer_unordered(4)
1113 .try_collect()
1114 .await?;
1115
1116 let workload = workload::WorkloadController::from_env();
1117 let state = AppState::new_multi(handles, tokens, server_policy, workload, Some(config_path))
1118 .map_err(|err| color_eyre::eyre::eyre!("multi-graph registry: {err}"))?;
1119 Ok(state)
1120}
1121
1122async fn open_single_graph(cfg: GraphStartupConfig) -> Result<Arc<GraphHandle>> {
1125 let graph_id = GraphId::try_from(cfg.graph_id.clone())
1126 .map_err(|err| color_eyre::eyre::eyre!("graph id '{}': {err}", cfg.graph_id))?;
1127 let uri = normalize_root_uri(&cfg.uri)
1128 .wrap_err_with(|| format!("normalize URI for graph '{}'", cfg.graph_id))?;
1129
1130 let db = Omnigraph::open(&uri)
1131 .await
1132 .map_err(|err| color_eyre::eyre::eyre!("open graph '{}' at {}: {err}", graph_id, uri))?;
1133
1134 let (policy_arc, db) = match &cfg.policy_file {
1135 Some(path) => {
1136 let policy = PolicyEngine::load_graph(path, graph_id.as_str())?;
1137 let policy_arc: Arc<PolicyEngine> = Arc::new(policy);
1138 let checker = Arc::clone(&policy_arc) as Arc<dyn omnigraph_policy::PolicyChecker>;
1139 (Some(policy_arc), db.with_policy(checker))
1140 }
1141 None => (None, db),
1142 };
1143
1144 Ok(Arc::new(GraphHandle {
1145 key: GraphKey::cluster(graph_id),
1146 uri,
1147 engine: Arc::new(db),
1148 policy: policy_arc,
1149 }))
1150}
1151
1152async fn shutdown_signal() {
1153 if let Err(err) = tokio::signal::ctrl_c().await {
1154 error!(error = %err, "failed to install ctrl-c handler");
1155 return;
1156 }
1157 info!("shutdown signal received");
1158}
1159
1160#[utoipa::path(
1161 get,
1162 path = "/healthz",
1163 tag = "health",
1164 operation_id = "health",
1165 responses(
1166 (status = 200, description = "Server is healthy", body = HealthOutput),
1167 ),
1168)]
1169async fn server_health() -> Json<HealthOutput> {
1175 Json(HealthOutput {
1176 status: "ok".to_string(),
1177 version: SERVER_VERSION.to_string(),
1178 source_version: SERVER_SOURCE_VERSION.map(str::to_string),
1179 })
1180}
1181
1182#[utoipa::path(
1183 get,
1184 path = "/graphs",
1185 tag = "management",
1186 operation_id = "listGraphs",
1187 responses(
1188 (status = 200, description = "List of registered graphs", body = GraphListResponse),
1189 (status = 401, description = "Unauthorized", body = ErrorOutput),
1190 (status = 403, description = "Forbidden", body = ErrorOutput),
1191 (status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput),
1192 ),
1193 security(("bearer_token" = [])),
1194)]
1195async fn server_graphs_list(
1204 State(state): State<AppState>,
1205 actor: Option<Extension<ResolvedActor>>,
1206) -> std::result::Result<Json<GraphListResponse>, ApiError> {
1207 let registry = match state.routing() {
1210 GraphRouting::Single { .. } => {
1211 return Err(ApiError::method_not_allowed(
1212 "GET /graphs is only available in multi-graph mode",
1213 ));
1214 }
1215 GraphRouting::Multi { registry, .. } => registry,
1216 };
1217
1218 authorize_request(
1227 actor.as_ref().map(|Extension(actor)| actor),
1228 state.server_policy.as_deref(),
1229 PolicyRequest {
1230 action: PolicyAction::GraphList,
1231 branch: None,
1232 target_branch: None,
1233 },
1234 )?;
1235
1236 let mut graphs: Vec<GraphInfo> = registry
1237 .list()
1238 .into_iter()
1239 .map(|handle| GraphInfo {
1240 graph_id: handle.key.graph_id.as_str().to_string(),
1241 uri: handle.uri.clone(),
1242 })
1243 .collect();
1244 graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
1245 Ok(Json(GraphListResponse { graphs }))
1246}
1247
1248async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
1249 let mut doc = ApiDoc::openapi();
1250 if !state.requires_bearer_auth() {
1251 strip_security(&mut doc);
1252 }
1253 if matches!(state.routing(), GraphRouting::Multi { .. }) {
1258 nest_paths_under_cluster_prefix(&mut doc);
1259 }
1260 Json(doc)
1261}
1262
1263const CLUSTER_PATH_PREFIX: &str = "/graphs/{graph_id}";
1266
1267const CLUSTER_OPERATION_ID_PREFIX: &str = "cluster_";
1272
1273const ALWAYS_FLAT_PATHS: &[&str] = &["/healthz", "/graphs"];
1279
1280fn nest_paths_under_cluster_prefix(doc: &mut utoipa::openapi::OpenApi) {
1291 let original = std::mem::take(&mut doc.paths.paths);
1292 let mut rewritten = std::collections::BTreeMap::new();
1293 for (path, mut item) in original {
1294 if ALWAYS_FLAT_PATHS.contains(&path.as_str()) {
1295 rewritten.insert(path, item);
1296 continue;
1297 }
1298 rename_operation_ids(&mut item, CLUSTER_OPERATION_ID_PREFIX);
1299 add_cluster_graph_id_parameter(&mut item);
1300 let new_path = format!("{CLUSTER_PATH_PREFIX}{path}");
1301 rewritten.insert(new_path, item);
1302 }
1303 doc.paths.paths = rewritten;
1304}
1305
1306fn add_cluster_graph_id_parameter(item: &mut utoipa::openapi::PathItem) {
1307 for op in path_item_operations_mut(item) {
1308 let parameters = op.parameters.get_or_insert_with(Vec::new);
1309 let has_graph_id = parameters
1310 .iter()
1311 .any(|param| param.name == "graph_id" && param.parameter_in == ParameterIn::Path);
1312 if !has_graph_id {
1313 parameters.insert(0, graph_id_path_parameter());
1314 }
1315 }
1316}
1317
1318fn graph_id_path_parameter() -> Parameter {
1319 let mut parameter = Parameter::new("graph_id");
1320 parameter.parameter_in = ParameterIn::Path;
1321 parameter.description = Some("Graph id to route the request to.".to_string());
1322 parameter.schema = Some(Object::with_type(Type::String).into());
1323 parameter
1324}
1325
1326fn rename_operation_ids(item: &mut utoipa::openapi::PathItem, prefix: &str) {
1328 for op in path_item_operations_mut(item) {
1329 if let Some(id) = op.operation_id.as_deref() {
1330 op.operation_id = Some(format!("{prefix}{id}"));
1331 }
1332 }
1333}
1334
1335fn path_item_operations_mut(
1336 item: &mut utoipa::openapi::PathItem,
1337) -> impl Iterator<Item = &mut utoipa::openapi::path::Operation> {
1338 [
1339 item.get.as_mut(),
1340 item.post.as_mut(),
1341 item.put.as_mut(),
1342 item.delete.as_mut(),
1343 item.options.as_mut(),
1344 item.head.as_mut(),
1345 item.patch.as_mut(),
1346 item.trace.as_mut(),
1347 ]
1348 .into_iter()
1349 .flatten()
1350}
1351
1352fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
1353 if let Some(components) = doc.components.as_mut() {
1354 components.security_schemes.clear();
1355 }
1356 for path_item in doc.paths.paths.values_mut() {
1357 for op in [
1358 path_item.get.as_mut(),
1359 path_item.post.as_mut(),
1360 path_item.put.as_mut(),
1361 path_item.delete.as_mut(),
1362 path_item.options.as_mut(),
1363 path_item.head.as_mut(),
1364 path_item.patch.as_mut(),
1365 path_item.trace.as_mut(),
1366 ]
1367 .into_iter()
1368 .flatten()
1369 {
1370 op.security = None;
1371 }
1372 }
1373}
1374
1375async fn require_bearer_auth(
1376 State(state): State<AppState>,
1377 mut request: Request,
1378 next: Next,
1379) -> std::result::Result<Response, ApiError> {
1380 if !state.requires_bearer_auth() {
1381 return Ok(next.run(request).await);
1382 }
1383
1384 let Some(header) = request
1385 .headers()
1386 .get(AUTHORIZATION)
1387 .and_then(|value| value.to_str().ok())
1388 else {
1389 return Err(ApiError::unauthorized("missing bearer token"));
1390 };
1391
1392 let Some(provided_token) = header.strip_prefix("Bearer ") else {
1393 return Err(ApiError::unauthorized("missing bearer token"));
1394 };
1395
1396 let Some(actor) = state.authenticate_bearer_token(provided_token) else {
1397 return Err(ApiError::unauthorized("invalid bearer token"));
1398 };
1399 request.extensions_mut().insert(actor);
1400
1401 Ok(next.run(request).await)
1402}
1403
1404async fn resolve_graph_handle(
1420 State(state): State<AppState>,
1421 mut request: Request,
1422 next: Next,
1423) -> std::result::Result<Response, ApiError> {
1424 let handle = match &state.routing {
1425 GraphRouting::Single { handle } => Arc::clone(handle),
1426 GraphRouting::Multi { registry, .. } => {
1427 let original_path: String = request
1435 .extensions()
1436 .get::<OriginalUri>()
1437 .map(|OriginalUri(uri)| uri.path().to_string())
1438 .unwrap_or_else(|| request.uri().path().to_string());
1439 let graph_id_str = original_path
1440 .strip_prefix("/graphs/")
1441 .and_then(|rest| rest.split('/').next())
1442 .filter(|s| !s.is_empty())
1443 .ok_or_else(|| {
1444 ApiError::bad_request(
1445 "cluster route missing /graphs/{graph_id} prefix".to_string(),
1446 )
1447 })?;
1448 let graph_id = GraphId::try_from(graph_id_str.to_string())
1449 .map_err(|err| ApiError::bad_request(err.to_string()))?;
1450 let key = GraphKey::cluster(graph_id.clone());
1451 match registry.get(&key) {
1452 RegistryLookup::Ready(handle) => handle,
1453 RegistryLookup::Gone => {
1454 return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
1455 }
1456 }
1457 }
1458 };
1459
1460 info!(graph_id = %handle.key.graph_id, "graph routed");
1465
1466 request.extensions_mut().insert(handle);
1467 Ok(next.run(request).await)
1468}
1469
1470fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
1471 info!(
1472 actor_id = actor_id,
1473 action = %request.action,
1474 branch = request.branch.as_deref().unwrap_or(""),
1475 target_branch = request.target_branch.as_deref().unwrap_or(""),
1476 allowed = decision.allowed,
1477 matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
1478 "policy decision"
1479 );
1480}
1481
1482fn authorize_request(
1497 actor: Option<&ResolvedActor>,
1498 policy: Option<&PolicyEngine>,
1499 request: PolicyRequest,
1500) -> std::result::Result<(), ApiError> {
1501 let Some(engine) = policy else {
1502 if request.action.resource_kind() == PolicyResourceKind::Server {
1527 return Err(ApiError::forbidden(
1528 "server-scoped actions require an explicit `server.policy.file` \
1529 configured in omnigraph.yaml — the management surface is closed \
1530 by default in every runtime state, including --unauthenticated, \
1531 so that server topology is never exposed without operator opt-in.",
1532 ));
1533 }
1534 if actor.is_some() && request.action != PolicyAction::Read {
1535 return Err(ApiError::forbidden(
1536 "server runs in default-deny mode (bearer tokens configured but no \
1537 policy file). Only `read` actions are permitted; configure \
1538 `policy.file` in omnigraph.yaml to enable other actions.",
1539 ));
1540 }
1541 return Ok(());
1542 };
1543 let Some(actor) = actor else {
1544 return Err(ApiError::unauthorized("missing bearer token"));
1545 };
1546 let actor_id = actor.actor_id.as_ref();
1558 let decision = engine
1559 .authorize(actor_id, &request)
1560 .map_err(|err| ApiError::internal(format!("policy: {err}")))?;
1561 log_policy_decision(actor_id, &request, &decision);
1562 if decision.allowed {
1563 Ok(())
1564 } else {
1565 Err(ApiError::forbidden(decision.message))
1566 }
1567}
1568
1569#[utoipa::path(
1570 get,
1571 path = "/snapshot",
1572 tag = "snapshots",
1573 operation_id = "getSnapshot",
1574 params(SnapshotQuery),
1575 responses(
1576 (status = 200, description = "Database snapshot", body = api::SnapshotOutput),
1577 (status = 401, description = "Unauthorized", body = ErrorOutput),
1578 (status = 403, description = "Forbidden", body = ErrorOutput),
1579 ),
1580 security(("bearer_token" = [])),
1581)]
1582async fn server_snapshot(
1588 Extension(handle): Extension<Arc<GraphHandle>>,
1589 actor: Option<Extension<ResolvedActor>>,
1590 Query(query): Query<SnapshotQuery>,
1591) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
1592 let branch = query.branch.unwrap_or_else(|| "main".to_string());
1593 authorize_request(
1594 actor.as_ref().map(|Extension(actor)| actor),
1595 handle.policy.as_deref(),
1596 PolicyRequest {
1597 action: PolicyAction::Read,
1598 branch: Some(branch.clone()),
1599 target_branch: None,
1600 },
1601 )?;
1602 let snapshot = {
1603 let db = &handle.engine;
1604 db.snapshot_of(ReadTarget::branch(branch.as_str()))
1605 .await
1606 .map_err(ApiError::from_omni)?
1607 };
1608 Ok(Json(snapshot_payload(&branch, &snapshot)))
1609}
1610
1611fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValue); 2] {
1614 [
1615 (
1616 HeaderName::from_static("deprecation"),
1617 HeaderValue::from_static("true"),
1618 ),
1619 (
1620 HeaderName::from_static("link"),
1621 HeaderValue::from_static(successor_link),
1622 ),
1623 ]
1624}
1625
1626#[utoipa::path(
1627 post,
1628 path = "/read",
1629 tag = "queries",
1630 operation_id = "read",
1631 request_body = ReadRequest,
1632 responses(
1633 (status = 200, description = "Query results (response includes `Deprecation: true` + `Link: </query>; rel=\"successor-version\"`)", body = ReadOutput),
1634 (status = 400, description = "Bad request", body = ErrorOutput),
1635 (status = 401, description = "Unauthorized", body = ErrorOutput),
1636 (status = 403, description = "Forbidden", body = ErrorOutput),
1637 ),
1638 security(("bearer_token" = [])),
1639)]
1640#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
1641async fn server_read(
1651 Extension(handle): Extension<Arc<GraphHandle>>,
1652 actor: Option<Extension<ResolvedActor>>,
1653 Json(request): Json<ReadRequest>,
1654) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ReadOutput>), ApiError> {
1655 let (selected_name, target, result) = run_query(
1656 handle,
1657 actor.as_ref().map(|Extension(actor)| actor),
1658 &request.query_source,
1659 request.query_name.as_deref(),
1660 request.params.as_ref(),
1661 request.branch,
1662 request.snapshot,
1663 false, )
1665 .await?;
1666 Ok((
1667 deprecation_headers("</query>; rel=\"successor-version\""),
1668 Json(api::read_output(selected_name, &target, result)),
1669 ))
1670}
1671
1672#[utoipa::path(
1673 post,
1674 path = "/query",
1675 tag = "queries",
1676 operation_id = "query",
1677 request_body = QueryRequest,
1678 responses(
1679 (status = 200, description = "Query results", body = ReadOutput),
1680 (status = 400, description = "Bad request - also returned when the query body contains mutations; use POST /mutate (or its deprecated alias POST /change) for write queries", body = ErrorOutput),
1681 (status = 401, description = "Unauthorized", body = ErrorOutput),
1682 (status = 403, description = "Forbidden", body = ErrorOutput),
1683 ),
1684 security(("bearer_token" = [])),
1685)]
1686async fn server_query(
1696 Extension(handle): Extension<Arc<GraphHandle>>,
1697 actor: Option<Extension<ResolvedActor>>,
1698 Json(request): Json<QueryRequest>,
1699) -> std::result::Result<Json<ReadOutput>, ApiError> {
1700 let (selected_name, target, result) = run_query(
1701 handle,
1702 actor.as_ref().map(|Extension(actor)| actor),
1703 &request.query,
1704 request.name.as_deref(),
1705 request.params.as_ref(),
1706 request.branch,
1707 request.snapshot,
1708 true, )
1710 .await?;
1711 Ok(Json(api::read_output(selected_name, &target, result)))
1712}
1713
1714#[utoipa::path(
1715 post,
1716 path = "/export",
1717 tag = "queries",
1718 operation_id = "export",
1719 request_body = ExportRequest,
1720 responses(
1721 (status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
1722 (status = 400, description = "Bad request", body = ErrorOutput),
1723 (status = 401, description = "Unauthorized", body = ErrorOutput),
1724 (status = 403, description = "Forbidden", body = ErrorOutput),
1725 ),
1726 security(("bearer_token" = [])),
1727)]
1728async fn server_export(
1735 Extension(handle): Extension<Arc<GraphHandle>>,
1736 actor: Option<Extension<ResolvedActor>>,
1737 Json(request): Json<ExportRequest>,
1738) -> std::result::Result<Response, ApiError> {
1739 let branch = request.branch.unwrap_or_else(|| "main".to_string());
1740 authorize_request(
1741 actor.as_ref().map(|Extension(actor)| actor),
1742 handle.policy.as_deref(),
1743 PolicyRequest {
1744 action: PolicyAction::Export,
1745 branch: Some(branch.clone()),
1746 target_branch: None,
1747 },
1748 )?;
1749 let engine = Arc::clone(&handle.engine);
1750 let type_names = request.type_names.clone();
1751 let table_keys = request.table_keys.clone();
1752 let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
1753 tokio::spawn(async move {
1754 let result = {
1755 let mut writer = ExportStreamWriter { sender: tx.clone() };
1756 engine
1757 .export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
1758 .await
1759 };
1760 if let Err(err) = result {
1761 let _ = tx.send(Err(io::Error::other(err.to_string())));
1762 }
1763 });
1764 let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
1765 rx.recv().await.map(|item| (item, rx))
1766 }));
1767 Ok((
1768 StatusCode::OK,
1769 [(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
1770 body,
1771 )
1772 .into_response())
1773}
1774
1775async fn run_mutate(
1786 state: AppState,
1787 handle: Arc<GraphHandle>,
1788 actor: Option<&ResolvedActor>,
1789 query: &str,
1790 name: Option<&str>,
1791 params_json: Option<&Value>,
1792 branch: String,
1793) -> std::result::Result<ChangeOutput, ApiError> {
1794 let actor_arc = actor
1795 .map(|a| Arc::clone(&a.actor_id))
1796 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
1797 let actor_id = actor.map(|a| a.actor_id.as_ref());
1798 authorize_request(
1799 actor,
1800 handle.policy.as_deref(),
1801 PolicyRequest {
1802 action: PolicyAction::Change,
1803 branch: Some(branch.clone()),
1804 target_branch: None,
1805 },
1806 )?;
1807 let est_bytes = query.len() as u64
1812 + params_json
1813 .map(|p| p.to_string().len() as u64)
1814 .unwrap_or(0);
1815 let _admission = state
1816 .workload
1817 .try_admit(&actor_arc, est_bytes)
1818 .map_err(ApiError::from_workload_reject)?;
1819 let (selected_name, query_params) =
1820 select_named_query(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
1821 let params = query_params_from_json(&query_params, params_json)
1822 .map_err(|err| ApiError::bad_request(err.to_string()))?;
1823
1824 let result = {
1825 let db = &handle.engine;
1826 db.mutate_as(&branch, query, &selected_name, ¶ms, actor_id)
1827 .await
1828 .map_err(ApiError::from_omni)?
1829 };
1830 Ok(ChangeOutput {
1831 branch,
1832 query_name: selected_name,
1833 affected_nodes: result.affected_nodes,
1834 affected_edges: result.affected_edges,
1835 actor_id: actor_id.map(str::to_string),
1836 })
1837}
1838
1839async fn run_query(
1852 handle: Arc<GraphHandle>,
1853 actor: Option<&ResolvedActor>,
1854 query: &str,
1855 name: Option<&str>,
1856 params_json: Option<&Value>,
1857 branch: Option<String>,
1858 snapshot: Option<String>,
1859 reject_mutations: bool,
1860) -> std::result::Result<(String, ReadTarget, omnigraph_compiler::result::QueryResult), ApiError> {
1861 if branch.is_some() && snapshot.is_some() {
1862 return Err(ApiError::bad_request(
1863 "request may specify branch or snapshot, not both",
1864 ));
1865 }
1866
1867 let target = read_target_from_request(branch, snapshot);
1868 let policy_branch = match &target {
1869 ReadTarget::Branch(branch) => Some(branch.clone()),
1870 ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
1871 let db = &handle.engine;
1872 db.resolved_branch_of(target.clone())
1873 .await
1874 .map(|branch| branch.or_else(|| Some("main".to_string())))
1875 .map_err(ApiError::from_omni)?
1876 }
1877 ReadTarget::Snapshot(_) => None,
1878 };
1879 authorize_request(
1880 actor,
1881 handle.policy.as_deref(),
1882 PolicyRequest {
1883 action: PolicyAction::Read,
1884 branch: policy_branch,
1885 target_branch: None,
1886 },
1887 )?;
1888 let query_decl =
1889 select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
1890 if reject_mutations && !query_decl.mutations.is_empty() {
1891 return Err(ApiError::bad_request(format!(
1892 "query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
1893 query_decl.name
1894 )));
1895 }
1896 let selected_name = query_decl.name.clone();
1897 let params = query_params_from_json(&query_decl.params, params_json)
1898 .map_err(|err| ApiError::bad_request(err.to_string()))?;
1899
1900 let result = {
1901 let db = &handle.engine;
1902 db.query(target.clone(), query, &selected_name, ¶ms)
1903 .await
1904 .map_err(ApiError::from_omni)?
1905 };
1906 Ok((selected_name, target, result))
1907}
1908
1909#[utoipa::path(
1910 post,
1911 path = "/change",
1912 tag = "mutations",
1913 operation_id = "change",
1914 request_body = ChangeRequest,
1915 responses(
1916 (status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: </mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
1917 (status = 400, description = "Bad request", body = ErrorOutput),
1918 (status = 401, description = "Unauthorized", body = ErrorOutput),
1919 (status = 403, description = "Forbidden", body = ErrorOutput),
1920 (status = 409, description = "Merge conflict", body = ErrorOutput),
1921 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
1922 ),
1923 security(("bearer_token" = [])),
1924)]
1925#[deprecated(note = "use POST /mutate instead; /change is kept indefinitely for back-compat")]
1926async fn server_change(
1936 State(state): State<AppState>,
1937 Extension(handle): Extension<Arc<GraphHandle>>,
1938 actor: Option<Extension<ResolvedActor>>,
1939 Json(request): Json<ChangeRequest>,
1940) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ChangeOutput>), ApiError> {
1941 let branch = request.branch.unwrap_or_else(|| "main".to_string());
1942 let output = run_mutate(
1943 state,
1944 handle,
1945 actor.as_ref().map(|Extension(actor)| actor),
1946 &request.query,
1947 request.name.as_deref(),
1948 request.params.as_ref(),
1949 branch,
1950 )
1951 .await?;
1952 Ok((
1953 deprecation_headers("</mutate>; rel=\"successor-version\""),
1954 Json(output),
1955 ))
1956}
1957
1958#[utoipa::path(
1959 post,
1960 path = "/mutate",
1961 tag = "mutations",
1962 operation_id = "mutate",
1963 request_body = ChangeRequest,
1964 responses(
1965 (status = 200, description = "Mutation results", body = ChangeOutput),
1966 (status = 400, description = "Bad request", body = ErrorOutput),
1967 (status = 401, description = "Unauthorized", body = ErrorOutput),
1968 (status = 403, description = "Forbidden", body = ErrorOutput),
1969 (status = 409, description = "Merge conflict", body = ErrorOutput),
1970 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
1971 ),
1972 security(("bearer_token" = [])),
1973)]
1974async fn server_mutate(
1984 State(state): State<AppState>,
1985 Extension(handle): Extension<Arc<GraphHandle>>,
1986 actor: Option<Extension<ResolvedActor>>,
1987 Json(request): Json<ChangeRequest>,
1988) -> std::result::Result<Json<ChangeOutput>, ApiError> {
1989 let branch = request.branch.unwrap_or_else(|| "main".to_string());
1990 Ok(Json(
1991 run_mutate(
1992 state,
1993 handle,
1994 actor.as_ref().map(|Extension(actor)| actor),
1995 &request.query,
1996 request.name.as_deref(),
1997 request.params.as_ref(),
1998 branch,
1999 )
2000 .await?,
2001 ))
2002}
2003
2004#[utoipa::path(
2005 get,
2006 path = "/schema",
2007 tag = "schema",
2008 operation_id = "getSchema",
2009 responses(
2010 (status = 200, description = "Current schema source", body = SchemaOutput),
2011 (status = 401, description = "Unauthorized", body = ErrorOutput),
2012 (status = 403, description = "Forbidden", body = ErrorOutput),
2013 ),
2014 security(("bearer_token" = [])),
2015)]
2016async fn server_schema_get(
2022 Extension(handle): Extension<Arc<GraphHandle>>,
2023 actor: Option<Extension<ResolvedActor>>,
2024) -> std::result::Result<Json<SchemaOutput>, ApiError> {
2025 authorize_request(
2026 actor.as_ref().map(|Extension(actor)| actor),
2027 handle.policy.as_deref(),
2028 PolicyRequest {
2029 action: PolicyAction::Read,
2030 branch: None,
2031 target_branch: None,
2032 },
2033 )?;
2034 let schema_source = {
2035 let db = &handle.engine;
2036 db.schema_source().to_string()
2037 };
2038 Ok(Json(SchemaOutput { schema_source }))
2039}
2040
2041#[utoipa::path(
2042 post,
2043 path = "/schema/apply",
2044 tag = "mutations",
2045 operation_id = "applySchema",
2046 request_body = SchemaApplyRequest,
2047 responses(
2048 (status = 200, description = "Schema apply results", body = SchemaApplyOutput),
2049 (status = 400, description = "Bad request", body = ErrorOutput),
2050 (status = 401, description = "Unauthorized", body = ErrorOutput),
2051 (status = 403, description = "Forbidden", body = ErrorOutput),
2052 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2053 ),
2054 security(("bearer_token" = [])),
2055)]
2056async fn server_schema_apply(
2063 State(state): State<AppState>,
2064 Extension(handle): Extension<Arc<GraphHandle>>,
2065 actor: Option<Extension<ResolvedActor>>,
2066 Json(request): Json<SchemaApplyRequest>,
2067) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
2068 let actor_arc = actor
2069 .as_ref()
2070 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2071 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2072 let actor_id = actor
2073 .as_ref()
2074 .map(|Extension(actor)| actor.actor_id.as_ref());
2075 authorize_request(
2076 actor.as_ref().map(|Extension(actor)| actor),
2077 handle.policy.as_deref(),
2078 PolicyRequest {
2079 action: PolicyAction::SchemaApply,
2080 branch: None,
2081 target_branch: Some("main".to_string()),
2082 },
2083 )?;
2084 let est_bytes = request.schema_source.len() as u64;
2085 let _admission = state
2086 .workload
2087 .try_admit(&actor_arc, est_bytes)
2088 .map_err(ApiError::from_workload_reject)?;
2089 let result = {
2090 let db = &handle.engine;
2091 db.apply_schema_as(
2098 &request.schema_source,
2099 omnigraph::db::SchemaApplyOptions {
2100 allow_data_loss: request.allow_data_loss,
2101 },
2102 actor_id,
2103 )
2104 .await
2105 .map_err(ApiError::from_omni)?
2106 };
2107 Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
2108}
2109
2110#[utoipa::path(
2111 post,
2112 path = "/ingest",
2113 tag = "mutations",
2114 operation_id = "ingest",
2115 request_body = IngestRequest,
2116 responses(
2117 (status = 200, description = "Ingest results", body = IngestOutput),
2118 (status = 400, description = "Bad request", body = ErrorOutput),
2119 (status = 401, description = "Unauthorized", body = ErrorOutput),
2120 (status = 403, description = "Forbidden", body = ErrorOutput),
2121 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2122 ),
2123 security(("bearer_token" = [])),
2124)]
2125async fn server_ingest(
2133 State(state): State<AppState>,
2134 Extension(handle): Extension<Arc<GraphHandle>>,
2135 actor: Option<Extension<ResolvedActor>>,
2136 Json(request): Json<IngestRequest>,
2137) -> std::result::Result<Json<IngestOutput>, ApiError> {
2138 let branch = request.branch.unwrap_or_else(|| "main".to_string());
2139 let from = request.from.unwrap_or_else(|| "main".to_string());
2140 let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
2141 let actor_arc = actor
2142 .as_ref()
2143 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2144 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2145 let actor_id = actor
2146 .as_ref()
2147 .map(|Extension(actor)| actor.actor_id.as_ref());
2148
2149 let branch_exists = {
2150 let db = &handle.engine;
2151 db.branch_list()
2152 .await
2153 .map_err(ApiError::from_omni)?
2154 .into_iter()
2155 .any(|name| name == branch)
2156 };
2157
2158 if !branch_exists {
2159 authorize_request(
2160 actor.as_ref().map(|Extension(actor)| actor),
2161 handle.policy.as_deref(),
2162 PolicyRequest {
2163 action: PolicyAction::BranchCreate,
2164 branch: Some(from.clone()),
2165 target_branch: Some(branch.clone()),
2166 },
2167 )?;
2168 }
2169 authorize_request(
2170 actor.as_ref().map(|Extension(actor)| actor),
2171 handle.policy.as_deref(),
2172 PolicyRequest {
2173 action: PolicyAction::Change,
2174 branch: Some(branch.clone()),
2175 target_branch: None,
2176 },
2177 )?;
2178 let est_bytes = request.data.len() as u64;
2179 let _admission = state
2180 .workload
2181 .try_admit(&actor_arc, est_bytes)
2182 .map_err(ApiError::from_workload_reject)?;
2183
2184 let result = {
2185 let db = &handle.engine;
2186 db.ingest_as(&branch, Some(&from), &request.data, mode, actor_id)
2187 .await
2188 .map_err(ApiError::from_omni)?
2189 };
2190
2191 Ok(Json(ingest_output(
2192 handle.uri.as_str(),
2193 &result,
2194 actor_id.map(str::to_string),
2195 )))
2196}
2197
2198#[utoipa::path(
2199 get,
2200 path = "/branches",
2201 tag = "branches",
2202 operation_id = "listBranches",
2203 responses(
2204 (status = 200, description = "List of branches", body = BranchListOutput),
2205 (status = 401, description = "Unauthorized", body = ErrorOutput),
2206 (status = 403, description = "Forbidden", body = ErrorOutput),
2207 ),
2208 security(("bearer_token" = [])),
2209)]
2210async fn server_branch_list(
2214 Extension(handle): Extension<Arc<GraphHandle>>,
2215 actor: Option<Extension<ResolvedActor>>,
2216) -> std::result::Result<Json<BranchListOutput>, ApiError> {
2217 authorize_request(
2218 actor.as_ref().map(|Extension(actor)| actor),
2219 handle.policy.as_deref(),
2220 PolicyRequest {
2221 action: PolicyAction::Read,
2222 branch: None,
2223 target_branch: None,
2224 },
2225 )?;
2226 let mut branches = {
2227 let db = &handle.engine;
2228 db.branch_list().await.map_err(ApiError::from_omni)?
2229 };
2230 branches.sort();
2231 Ok(Json(BranchListOutput { branches }))
2232}
2233
2234#[utoipa::path(
2235 post,
2236 path = "/branches",
2237 tag = "branches",
2238 operation_id = "createBranch",
2239 request_body = BranchCreateRequest,
2240 responses(
2241 (status = 200, description = "Branch created", body = BranchCreateOutput),
2242 (status = 400, description = "Bad request", body = ErrorOutput),
2243 (status = 401, description = "Unauthorized", body = ErrorOutput),
2244 (status = 403, description = "Forbidden", body = ErrorOutput),
2245 (status = 409, description = "Branch already exists", body = ErrorOutput),
2246 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2247 ),
2248 security(("bearer_token" = [])),
2249)]
2250async fn server_branch_create(
2256 State(state): State<AppState>,
2257 Extension(handle): Extension<Arc<GraphHandle>>,
2258 actor: Option<Extension<ResolvedActor>>,
2259 Json(request): Json<BranchCreateRequest>,
2260) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
2261 let from = request.from.unwrap_or_else(|| "main".to_string());
2262 let actor_arc = actor
2263 .as_ref()
2264 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2265 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2266 authorize_request(
2267 actor.as_ref().map(|Extension(actor)| actor),
2268 handle.policy.as_deref(),
2269 PolicyRequest {
2270 action: PolicyAction::BranchCreate,
2271 branch: Some(from.clone()),
2272 target_branch: Some(request.name.clone()),
2273 },
2274 )?;
2275 let _admission = state
2279 .workload
2280 .try_admit(&actor_arc, 256)
2281 .map_err(ApiError::from_workload_reject)?;
2282 {
2283 let db = &handle.engine;
2284 db.branch_create_from_as(
2285 ReadTarget::branch(&from),
2286 &request.name,
2287 actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
2288 )
2289 .await
2290 .map_err(ApiError::from_omni)?;
2291 }
2292 Ok(Json(BranchCreateOutput {
2293 uri: handle.uri.clone(),
2294 from,
2295 name: request.name,
2296 actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
2297 }))
2298}
2299
2300#[derive(Deserialize)]
2310struct BranchPath {
2311 branch: String,
2312}
2313
2314#[utoipa::path(
2315 delete,
2316 path = "/branches/{branch}",
2317 tag = "branches",
2318 operation_id = "deleteBranch",
2319 params(
2320 ("branch" = String, Path, description = "Branch name to delete"),
2321 ),
2322 responses(
2323 (status = 200, description = "Branch deleted", body = BranchDeleteOutput),
2324 (status = 401, description = "Unauthorized", body = ErrorOutput),
2325 (status = 403, description = "Forbidden", body = ErrorOutput),
2326 (status = 404, description = "Branch not found", body = ErrorOutput),
2327 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2328 ),
2329 security(("bearer_token" = [])),
2330)]
2331async fn server_branch_delete(
2337 State(state): State<AppState>,
2338 Extension(handle): Extension<Arc<GraphHandle>>,
2339 actor: Option<Extension<ResolvedActor>>,
2340 Path(BranchPath { branch }): Path<BranchPath>,
2341) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
2342 let actor_arc = actor
2343 .as_ref()
2344 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2345 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2346 let actor_id = actor
2347 .as_ref()
2348 .map(|Extension(actor)| actor.actor_id.as_ref());
2349 authorize_request(
2350 actor.as_ref().map(|Extension(actor)| actor),
2351 handle.policy.as_deref(),
2352 PolicyRequest {
2353 action: PolicyAction::BranchDelete,
2354 branch: None,
2355 target_branch: Some(branch.clone()),
2356 },
2357 )?;
2358 let _admission = state
2360 .workload
2361 .try_admit(&actor_arc, 256)
2362 .map_err(ApiError::from_workload_reject)?;
2363 {
2364 let db = &handle.engine;
2365 db.branch_delete_as(&branch, actor_id)
2366 .await
2367 .map_err(ApiError::from_omni)?;
2368 }
2369 Ok(Json(BranchDeleteOutput {
2370 uri: handle.uri.clone(),
2371 name: branch,
2372 actor_id: actor_id.map(str::to_string),
2373 }))
2374}
2375
2376#[utoipa::path(
2377 post,
2378 path = "/branches/merge",
2379 tag = "branches",
2380 operation_id = "mergeBranches",
2381 request_body = BranchMergeRequest,
2382 responses(
2383 (status = 200, description = "Branches merged", body = BranchMergeOutput),
2384 (status = 400, description = "Bad request", body = ErrorOutput),
2385 (status = 401, description = "Unauthorized", body = ErrorOutput),
2386 (status = 403, description = "Forbidden", body = ErrorOutput),
2387 (status = 409, description = "Merge conflict", body = ErrorOutput),
2388 (status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
2389 ),
2390 security(("bearer_token" = [])),
2391)]
2392async fn server_branch_merge(
2399 State(state): State<AppState>,
2400 Extension(handle): Extension<Arc<GraphHandle>>,
2401 actor: Option<Extension<ResolvedActor>>,
2402 Json(request): Json<BranchMergeRequest>,
2403) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
2404 let target = request.target.unwrap_or_else(|| "main".to_string());
2405 let actor_arc = actor
2406 .as_ref()
2407 .map(|Extension(actor)| Arc::clone(&actor.actor_id))
2408 .unwrap_or_else(|| Arc::<str>::from("anonymous"));
2409 let actor_id = actor
2410 .as_ref()
2411 .map(|Extension(actor)| actor.actor_id.as_ref());
2412 authorize_request(
2413 actor.as_ref().map(|Extension(actor)| actor),
2414 handle.policy.as_deref(),
2415 PolicyRequest {
2416 action: PolicyAction::BranchMerge,
2417 branch: Some(request.source.clone()),
2418 target_branch: Some(target.clone()),
2419 },
2420 )?;
2421 let _admission = state
2425 .workload
2426 .try_admit(&actor_arc, 256)
2427 .map_err(ApiError::from_workload_reject)?;
2428 let outcome = {
2429 let db = &handle.engine;
2430 db.branch_merge_as(&request.source, &target, actor_id)
2431 .await
2432 .map_err(ApiError::from_omni)?
2433 };
2434 Ok(Json(BranchMergeOutput {
2435 source: request.source,
2436 target,
2437 outcome: outcome.into(),
2438 actor_id: actor_id.map(str::to_string),
2439 }))
2440}
2441
2442#[utoipa::path(
2443 get,
2444 path = "/commits",
2445 tag = "commits",
2446 operation_id = "listCommits",
2447 params(CommitListQuery),
2448 responses(
2449 (status = 200, description = "List of commits", body = CommitListOutput),
2450 (status = 401, description = "Unauthorized", body = ErrorOutput),
2451 (status = 403, description = "Forbidden", body = ErrorOutput),
2452 ),
2453 security(("bearer_token" = [])),
2454)]
2455async fn server_commit_list(
2460 Extension(handle): Extension<Arc<GraphHandle>>,
2461 actor: Option<Extension<ResolvedActor>>,
2462 Query(query): Query<CommitListQuery>,
2463) -> std::result::Result<Json<CommitListOutput>, ApiError> {
2464 authorize_request(
2465 actor.as_ref().map(|Extension(actor)| actor),
2466 handle.policy.as_deref(),
2467 PolicyRequest {
2468 action: PolicyAction::Read,
2469 branch: query.branch.clone(),
2470 target_branch: None,
2471 },
2472 )?;
2473 let commits = {
2474 let db = &handle.engine;
2475 db.list_commits(query.branch.as_deref())
2476 .await
2477 .map_err(ApiError::from_omni)?
2478 };
2479 Ok(Json(CommitListOutput {
2480 commits: commits.iter().map(api::commit_output).collect(),
2481 }))
2482}
2483
2484#[derive(Deserialize)]
2487struct CommitPath {
2488 commit_id: String,
2489}
2490
2491#[utoipa::path(
2492 get,
2493 path = "/commits/{commit_id}",
2494 tag = "commits",
2495 operation_id = "getCommit",
2496 params(
2497 ("commit_id" = String, Path, description = "Commit identifier"),
2498 ),
2499 responses(
2500 (status = 200, description = "Commit details", body = api::CommitOutput),
2501 (status = 401, description = "Unauthorized", body = ErrorOutput),
2502 (status = 403, description = "Forbidden", body = ErrorOutput),
2503 (status = 404, description = "Commit not found", body = ErrorOutput),
2504 ),
2505 security(("bearer_token" = [])),
2506)]
2507
2508async fn server_commit_show(
2513 Extension(handle): Extension<Arc<GraphHandle>>,
2514 actor: Option<Extension<ResolvedActor>>,
2515 Path(CommitPath { commit_id }): Path<CommitPath>,
2516) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
2517 authorize_request(
2518 actor.as_ref().map(|Extension(actor)| actor),
2519 handle.policy.as_deref(),
2520 PolicyRequest {
2521 action: PolicyAction::Read,
2522 branch: None,
2523 target_branch: None,
2524 },
2525 )?;
2526 let commit = {
2527 let db = &handle.engine;
2528 db.get_commit(&commit_id)
2529 .await
2530 .map_err(ApiError::from_omni)?
2531 };
2532 Ok(Json(api::commit_output(&commit)))
2533}
2534
2535fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
2536 if let Some(snapshot) = snapshot {
2537 ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
2538 } else {
2539 ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
2540 }
2541}
2542
2543fn select_named_query_decl(
2544 query_source: &str,
2545 requested_name: Option<&str>,
2546) -> Result<omnigraph_compiler::query::ast::QueryDecl> {
2547 let parsed = parse_query(query_source)?;
2548 let query = if let Some(name) = requested_name {
2549 parsed
2550 .queries
2551 .into_iter()
2552 .find(|query| query.name == name)
2553 .ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
2554 } else if parsed.queries.len() == 1 {
2555 parsed.queries.into_iter().next().unwrap()
2556 } else {
2557 bail!("query file contains multiple queries; pass --name");
2558 };
2559 Ok(query)
2560}
2561
2562fn select_named_query(
2563 query_source: &str,
2564 requested_name: Option<&str>,
2565) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
2566 let query = select_named_query_decl(query_source, requested_name)?;
2567 Ok((query.name, query.params))
2568}
2569
2570fn query_params_from_json(
2571 query_params: &[omnigraph_compiler::query::ast::Param],
2572 params_json: Option<&Value>,
2573) -> Result<ParamMap> {
2574 json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
2575 .map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
2576}
2577
2578fn normalize_bearer_token(value: Option<String>) -> Option<String> {
2579 value
2580 .map(|value| value.trim().to_string())
2581 .filter(|value| !value.is_empty())
2582}
2583
2584fn normalize_bearer_actor(value: String) -> Result<String> {
2585 let value = value.trim().to_string();
2586 if value.is_empty() {
2587 bail!("bearer token actor names must not be blank");
2588 }
2589 Ok(value)
2590}
2591
2592fn parse_bearer_tokens_json(value: &str) -> Result<Vec<(String, String)>> {
2593 let entries: HashMap<String, String> = serde_json::from_str(value)
2594 .wrap_err("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON must be a JSON object of actor->token")?;
2595 Ok(entries.into_iter().collect())
2596}
2597
2598fn read_bearer_tokens_file(path: &str) -> Result<Vec<(String, String)>> {
2599 let contents = fs::read_to_string(path)
2600 .wrap_err_with(|| format!("failed to read bearer tokens file at {path}"))?;
2601 parse_bearer_tokens_json(&contents)
2602 .wrap_err_with(|| format!("failed to parse bearer tokens file at {path}"))
2603}
2604
2605fn validate_bearer_tokens(entries: Vec<(String, String)>) -> Result<Vec<(String, String)>> {
2606 let mut seen_actors = HashSet::new();
2607 let mut seen_tokens = HashSet::new();
2608 let mut normalized = Vec::with_capacity(entries.len());
2609
2610 for (actor, token) in entries {
2611 let actor = normalize_bearer_actor(actor)?;
2612 let Some(token) = normalize_bearer_token(Some(token)) else {
2613 bail!("bearer token for actor '{actor}' must not be blank");
2614 };
2615 if !seen_actors.insert(actor.clone()) {
2616 bail!("duplicate bearer token actor '{actor}'");
2617 }
2618 if !seen_tokens.insert(token.clone()) {
2619 bail!("duplicate bearer token value configured");
2620 }
2621 normalized.push((actor, token));
2622 }
2623
2624 normalized.sort_by(|(left, _), (right, _)| left.cmp(right));
2625 Ok(normalized)
2626}
2627
2628fn server_bearer_tokens_from_env() -> Result<Vec<(String, String)>> {
2629 let mut entries = Vec::new();
2630
2631 if let Some(token) = normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKEN").ok())
2632 {
2633 entries.push(("default".to_string(), token));
2634 }
2635
2636 if let Some(path) =
2637 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE").ok())
2638 {
2639 entries.extend(read_bearer_tokens_file(&path)?);
2640 } else if let Some(json) =
2641 normalize_bearer_token(std::env::var("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON").ok())
2642 {
2643 entries.extend(parse_bearer_tokens_json(&json)?);
2644 }
2645
2646 validate_bearer_tokens(entries)
2647}
2648
2649#[cfg(test)]
2650mod tests {
2651 use super::{
2652 GraphStartupConfig, ServerConfig, ServerConfigMode, ServerRuntimeState,
2653 classify_server_runtime_state, hash_bearer_token, load_server_settings,
2654 normalize_bearer_token, parse_bearer_tokens_json, serve, server_bearer_tokens_from_env,
2655 };
2656 use serial_test::serial;
2657 use std::env;
2658 use std::fs;
2659 use tempfile::tempdir;
2660
2661 #[test]
2662 fn hash_bearer_token_produces_32_byte_output() {
2663 let hash = hash_bearer_token("any-token");
2664 assert_eq!(hash.len(), 32);
2665 }
2666
2667 #[test]
2668 fn hash_bearer_token_is_deterministic() {
2669 assert_eq!(
2670 hash_bearer_token("stable-input"),
2671 hash_bearer_token("stable-input"),
2672 );
2673 }
2674
2675 #[test]
2676 fn hash_bearer_token_differs_for_different_inputs() {
2677 assert_ne!(hash_bearer_token("token-a"), hash_bearer_token("token-b"));
2678 }
2679
2680 #[test]
2681 fn hash_bearer_token_matches_known_sha256_vector() {
2682 let hash = hash_bearer_token("abc");
2684 let hex: String = hash.iter().map(|b| format!("{:02x}", b)).collect();
2685 assert_eq!(
2686 hex,
2687 "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
2688 );
2689 }
2690
2691 #[test]
2692 fn server_settings_load_from_yaml_config() {
2693 let temp = tempdir().unwrap();
2694 let config = temp.path().join("omnigraph.yaml");
2695 fs::write(
2696 &config,
2697 r#"
2698graphs:
2699 local:
2700 uri: /tmp/demo.omni
2701server:
2702 graph: local
2703 bind: 0.0.0.0:9090
2704"#,
2705 )
2706 .unwrap();
2707
2708 let settings = load_server_settings(Some(&config), None, None, None, false).unwrap();
2709 match &settings.mode {
2710 ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/demo.omni"),
2711 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2712 }
2713 assert_eq!(settings.bind, "0.0.0.0:9090");
2714 }
2715
2716 #[test]
2717 fn server_settings_cli_flags_override_yaml_config() {
2718 let temp = tempdir().unwrap();
2719 let config = temp.path().join("omnigraph.yaml");
2720 fs::write(
2721 &config,
2722 r#"
2723graphs:
2724 local:
2725 uri: /tmp/demo.omni
2726server:
2727 graph: local
2728 bind: 127.0.0.1:8080
2729"#,
2730 )
2731 .unwrap();
2732
2733 let settings = load_server_settings(
2734 Some(&config),
2735 Some("/tmp/override.omni".to_string()),
2736 None,
2737 Some("0.0.0.0:9999".to_string()),
2738 false,
2739 )
2740 .unwrap();
2741 match &settings.mode {
2742 ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "/tmp/override.omni"),
2743 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2744 }
2745 assert_eq!(settings.bind, "0.0.0.0:9999");
2746 }
2747
2748 #[test]
2749 fn server_settings_can_resolve_named_target() {
2750 let temp = tempdir().unwrap();
2751 let config = temp.path().join("omnigraph.yaml");
2752 fs::write(
2753 &config,
2754 r#"
2755graphs:
2756 local:
2757 uri: ./demo.omni
2758 dev:
2759 uri: http://127.0.0.1:8080
2760server:
2761 graph: local
2762 bind: 127.0.0.1:8080
2763"#,
2764 )
2765 .unwrap();
2766
2767 let settings =
2768 load_server_settings(Some(&config), None, Some("dev".to_string()), None, false)
2769 .unwrap();
2770 match &settings.mode {
2771 ServerConfigMode::Single { uri, .. } => assert_eq!(uri, "http://127.0.0.1:8080"),
2772 ServerConfigMode::Multi { .. } => panic!("expected Single mode, got Multi"),
2773 }
2774 }
2775
2776 #[test]
2777 fn server_settings_require_uri_from_cli_or_config() {
2778 let error = load_server_settings(None, None, None, None, false).unwrap_err();
2779 assert!(
2780 error.to_string().contains("no graph to serve"),
2781 "expected mode-inference error, got: {error}",
2782 );
2783 }
2784
2785 #[test]
2786 fn classify_open_requires_explicit_unauthenticated_flag() {
2787 let error = classify_server_runtime_state(false, false, false).unwrap_err();
2789 let msg = error.to_string();
2790 assert!(
2791 msg.contains("--unauthenticated"),
2792 "expected refusal message mentioning --unauthenticated, got: {msg}"
2793 );
2794
2795 assert_eq!(
2797 classify_server_runtime_state(false, false, true).unwrap(),
2798 ServerRuntimeState::Open
2799 );
2800 }
2801
2802 #[test]
2803 fn classify_tokens_without_policy_is_default_deny() {
2804 assert_eq!(
2808 classify_server_runtime_state(true, false, false).unwrap(),
2809 ServerRuntimeState::DefaultDeny
2810 );
2811 assert_eq!(
2812 classify_server_runtime_state(true, false, true).unwrap(),
2813 ServerRuntimeState::DefaultDeny
2814 );
2815 }
2816
2817 #[tokio::test]
2818 #[serial]
2819 async fn serve_refuses_to_start_with_policy_but_no_tokens_multi_mode() {
2820 let _guard = EnvGuard::set(&[
2829 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
2830 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
2831 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
2832 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
2833 ("OMNIGRAPH_UNAUTHENTICATED", None),
2834 ]);
2835 let temp = tempdir().unwrap();
2836 let policy_path = temp.path().join("server-policy.yaml");
2841 let config = ServerConfig {
2842 mode: ServerConfigMode::Multi {
2843 graphs: vec![GraphStartupConfig {
2844 graph_id: "alpha".to_string(),
2845 uri: temp
2846 .path()
2847 .join("alpha.omni")
2848 .to_string_lossy()
2849 .into_owned(),
2850 policy_file: None,
2851 }],
2852 config_path: temp.path().join("omnigraph.yaml"),
2853 server_policy_file: Some(policy_path),
2854 },
2855 bind: "127.0.0.1:0".to_string(),
2856 allow_unauthenticated: false,
2857 };
2858 let result = serve(config).await;
2859 let err = result
2860 .expect_err("serve should refuse to start in multi mode with policy but no tokens");
2861 let msg = format!("{:?}", err);
2862 assert!(
2863 msg.contains("policy file is configured but no bearer tokens"),
2864 "expected policy-without-tokens rejection in multi mode, got: {msg}",
2865 );
2866 }
2867
2868 #[tokio::test]
2869 #[serial]
2870 async fn serve_refuses_to_start_in_state_1_without_unauthenticated() {
2871 let _guard = EnvGuard::set(&[
2882 ("OMNIGRAPH_SERVER_BEARER_TOKEN", None),
2883 ("OMNIGRAPH_SERVER_BEARER_TOKENS_FILE", None),
2884 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
2885 ("OMNIGRAPH_SERVER_BEARER_TOKENS_AWS_SECRET", None),
2886 ("OMNIGRAPH_UNAUTHENTICATED", None),
2887 ]);
2888 let temp = tempdir().unwrap();
2889 let config = ServerConfig {
2892 mode: ServerConfigMode::Single {
2893 uri: temp
2894 .path()
2895 .join("graph.omni")
2896 .to_string_lossy()
2897 .into_owned(),
2898 policy_file: None,
2899 },
2900 bind: "127.0.0.1:0".to_string(),
2901 allow_unauthenticated: false,
2902 };
2903 let result = serve(config).await;
2904 let err =
2905 result.expect_err("serve should refuse to start in State 1 without --unauthenticated");
2906 let msg = format!("{:?}", err);
2907 assert!(
2908 msg.contains("no bearer tokens") || msg.contains("policy file"),
2909 "expected refusal message naming the misconfiguration, got: {msg}",
2910 );
2911 }
2912
2913 #[test]
2914 #[serial]
2915 fn unauthenticated_env_var_classification() {
2916 let temp = tempdir().unwrap();
2926 let config_path = temp.path().join("omnigraph.yaml");
2927 fs::write(
2928 &config_path,
2929 r#"
2930graphs:
2931 local:
2932 uri: /tmp/demo-unauth.omni
2933server:
2934 graph: local
2935"#,
2936 )
2937 .unwrap();
2938
2939 for value in ["1", "true", "yes", "TRUE", "anything"] {
2941 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
2942 let settings = load_server_settings(Some(&config_path), None, None, None, false)
2943 .expect("settings load should succeed");
2944 assert!(
2945 settings.allow_unauthenticated,
2946 "OMNIGRAPH_UNAUTHENTICATED={value:?} should enable Open mode",
2947 );
2948 }
2949
2950 for value in ["0", "false", "FALSE", ""] {
2952 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some(value))]);
2953 let settings = load_server_settings(Some(&config_path), None, None, None, false)
2954 .expect("settings load should succeed");
2955 assert!(
2956 !settings.allow_unauthenticated,
2957 "OMNIGRAPH_UNAUTHENTICATED={value:?} should NOT enable Open mode",
2958 );
2959 }
2960
2961 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", None)]);
2963 let settings = load_server_settings(Some(&config_path), None, None, None, false)
2964 .expect("settings load should succeed");
2965 assert!(
2966 !settings.allow_unauthenticated,
2967 "OMNIGRAPH_UNAUTHENTICATED unset should NOT enable Open mode",
2968 );
2969 drop(_guard);
2970
2971 let _guard = EnvGuard::set(&[("OMNIGRAPH_UNAUTHENTICATED", Some("0"))]);
2974 let settings = load_server_settings(Some(&config_path), None, None, None, true)
2975 .expect("settings load should succeed");
2976 assert!(
2977 settings.allow_unauthenticated,
2978 "--unauthenticated CLI flag should win even when env is falsy",
2979 );
2980 }
2981
2982 #[test]
2983 fn classify_policy_enabled_requires_tokens() {
2984 assert_eq!(
2988 classify_server_runtime_state(true, true, false).unwrap(),
2989 ServerRuntimeState::PolicyEnabled
2990 );
2991 assert_eq!(
2992 classify_server_runtime_state(true, true, true).unwrap(),
2993 ServerRuntimeState::PolicyEnabled
2994 );
2995 }
2996
2997 #[test]
2998 fn classify_policy_without_tokens_is_rejected() {
2999 for allow_unauthenticated in [false, true] {
3006 let err =
3007 classify_server_runtime_state(false, true, allow_unauthenticated).unwrap_err();
3008 let msg = err.to_string();
3009 assert!(
3010 msg.contains("policy file is configured but no bearer tokens"),
3011 "expected policy-without-tokens rejection message; got: {msg}"
3012 );
3013 assert!(
3014 msg.contains("every request would 401"),
3015 "rejection message must name the failure mode; got: {msg}"
3016 );
3017 }
3018 }
3019
3020 #[test]
3021 fn normalize_bearer_token_trims_and_filters_blank_values() {
3022 assert_eq!(normalize_bearer_token(None), None);
3023 assert_eq!(normalize_bearer_token(Some(" ".to_string())), None);
3024 assert_eq!(
3025 normalize_bearer_token(Some(" demo-token ".to_string())).as_deref(),
3026 Some("demo-token")
3027 );
3028 }
3029
3030 struct EnvGuard {
3031 saved: Vec<(&'static str, Option<String>)>,
3032 }
3033
3034 impl EnvGuard {
3035 fn set(vars: &[(&'static str, Option<&str>)]) -> Self {
3036 let saved = vars
3037 .iter()
3038 .map(|(name, _)| (*name, env::var(name).ok()))
3039 .collect::<Vec<_>>();
3040 for (name, value) in vars {
3041 unsafe {
3042 match value {
3043 Some(value) => env::set_var(name, value),
3044 None => env::remove_var(name),
3045 }
3046 }
3047 }
3048 Self { saved }
3049 }
3050 }
3051
3052 impl Drop for EnvGuard {
3053 fn drop(&mut self) {
3054 for (name, value) in self.saved.drain(..) {
3055 unsafe {
3056 match value {
3057 Some(value) => env::set_var(name, value),
3058 None => env::remove_var(name),
3059 }
3060 }
3061 }
3062 }
3063 }
3064
3065 #[test]
3066 fn parse_bearer_tokens_json_reads_actor_token_map() {
3067 let tokens = parse_bearer_tokens_json(r#"{"alice":" token-a ","bob":"token-b"}"#).unwrap();
3068 assert_eq!(tokens.len(), 2);
3069 assert!(tokens.contains(&("alice".to_string(), " token-a ".to_string())));
3070 assert!(tokens.contains(&("bob".to_string(), "token-b".to_string())));
3071 }
3072
3073 #[test]
3074 #[serial]
3075 fn server_bearer_tokens_from_env_reads_legacy_token_and_token_file() {
3076 let temp = tempdir().unwrap();
3077 let tokens_path = temp.path().join("tokens.json");
3078 fs::write(
3079 &tokens_path,
3080 r#"{"team-01":"token-one","team-02":"token-two"}"#,
3081 )
3082 .unwrap();
3083
3084 let _guard = EnvGuard::set(&[
3085 ("OMNIGRAPH_SERVER_BEARER_TOKEN", Some(" legacy-token ")),
3086 (
3087 "OMNIGRAPH_SERVER_BEARER_TOKENS_FILE",
3088 Some(tokens_path.to_str().unwrap()),
3089 ),
3090 ("OMNIGRAPH_SERVER_BEARER_TOKENS_JSON", None),
3091 ]);
3092
3093 let tokens = server_bearer_tokens_from_env().unwrap();
3094 assert_eq!(
3095 tokens,
3096 vec![
3097 ("default".to_string(), "legacy-token".to_string()),
3098 ("team-01".to_string(), "token-one".to_string()),
3099 ("team-02".to_string(), "token-two".to_string()),
3100 ]
3101 );
3102 }
3103}