use super::*;
#[utoipa::path(
get,
path = "/healthz",
tag = "health",
operation_id = "health",
responses(
(status = 200, description = "Server is healthy", body = HealthOutput),
),
)]
pub(crate) async fn server_health() -> Json<HealthOutput> {
Json(HealthOutput {
status: "ok".to_string(),
version: SERVER_VERSION.to_string(),
source_version: SERVER_SOURCE_VERSION.map(str::to_string),
})
}
#[utoipa::path(
get,
path = "/graphs",
tag = "management",
operation_id = "listGraphs",
responses(
(status = 200, description = "List of registered graphs", body = GraphListResponse),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 405, description = "Method not allowed (single-graph mode)", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_graphs_list(
State(state): State<AppState>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<GraphListResponse>, ApiError> {
let registry = &state.routing().registry;
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
state.server_policy.as_deref(),
PolicyRequest {
action: PolicyAction::GraphList,
branch: None,
target_branch: None,
},
)?;
let mut graphs: Vec<GraphInfo> = registry
.list()
.into_iter()
.map(|handle| GraphInfo {
graph_id: handle.key.graph_id.as_str().to_string(),
uri: handle.uri.clone(),
})
.collect();
graphs.sort_by(|a, b| a.graph_id.cmp(&b.graph_id));
Ok(Json(GraphListResponse { graphs }))
}
pub(crate) async fn server_openapi(State(state): State<AppState>) -> Json<utoipa::openapi::OpenApi> {
let mut doc = crate::served_openapi();
if !state.requires_bearer_auth() {
strip_security(&mut doc);
}
Json(doc)
}
const CLUSTER_PATH_PREFIX: &str = "/graphs/{graph_id}";
const CLUSTER_OPERATION_ID_PREFIX: &str = "cluster_";
const ALWAYS_FLAT_PATHS: &[&str] = &["/healthz", "/graphs"];
pub(crate) fn nest_paths_under_cluster_prefix(doc: &mut utoipa::openapi::OpenApi) {
let original = std::mem::take(&mut doc.paths.paths);
let mut rewritten = std::collections::BTreeMap::new();
for (path, mut item) in original {
if ALWAYS_FLAT_PATHS.contains(&path.as_str()) {
rewritten.insert(path, item);
continue;
}
rename_operation_ids(&mut item, CLUSTER_OPERATION_ID_PREFIX);
add_cluster_graph_id_parameter(&mut item);
let new_path = format!("{CLUSTER_PATH_PREFIX}{path}");
rewritten.insert(new_path, item);
}
doc.paths.paths = rewritten;
}
pub(crate) fn add_cluster_graph_id_parameter(item: &mut utoipa::openapi::PathItem) {
for op in path_item_operations_mut(item) {
let parameters = op.parameters.get_or_insert_with(Vec::new);
let has_graph_id = parameters
.iter()
.any(|param| param.name == "graph_id" && param.parameter_in == ParameterIn::Path);
if !has_graph_id {
parameters.insert(0, graph_id_path_parameter());
}
}
}
pub(crate) fn graph_id_path_parameter() -> Parameter {
let mut parameter = Parameter::new("graph_id");
parameter.parameter_in = ParameterIn::Path;
parameter.description = Some("Graph id to route the request to.".to_string());
parameter.schema = Some(Object::with_type(Type::String).into());
parameter
}
pub(crate) fn rename_operation_ids(item: &mut utoipa::openapi::PathItem, prefix: &str) {
for op in path_item_operations_mut(item) {
if let Some(id) = op.operation_id.as_deref() {
op.operation_id = Some(format!("{prefix}{id}"));
}
}
}
pub(crate) fn path_item_operations_mut(
item: &mut utoipa::openapi::PathItem,
) -> impl Iterator<Item = &mut utoipa::openapi::path::Operation> {
[
item.get.as_mut(),
item.post.as_mut(),
item.put.as_mut(),
item.delete.as_mut(),
item.options.as_mut(),
item.head.as_mut(),
item.patch.as_mut(),
item.trace.as_mut(),
]
.into_iter()
.flatten()
}
pub(crate) fn strip_security(doc: &mut utoipa::openapi::OpenApi) {
if let Some(components) = doc.components.as_mut() {
components.security_schemes.clear();
}
for path_item in doc.paths.paths.values_mut() {
for op in [
path_item.get.as_mut(),
path_item.post.as_mut(),
path_item.put.as_mut(),
path_item.delete.as_mut(),
path_item.options.as_mut(),
path_item.head.as_mut(),
path_item.patch.as_mut(),
path_item.trace.as_mut(),
]
.into_iter()
.flatten()
{
op.security = None;
}
}
}
pub(crate) async fn require_bearer_auth(
State(state): State<AppState>,
mut request: Request,
next: Next,
) -> std::result::Result<Response, ApiError> {
if !state.requires_bearer_auth() {
return Ok(next.run(request).await);
}
let Some(header) = request
.headers()
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())
else {
return Err(ApiError::unauthorized("missing bearer token"));
};
let Some(provided_token) = header.strip_prefix("Bearer ") else {
return Err(ApiError::unauthorized("missing bearer token"));
};
let Some(actor) = state.authenticate_bearer_token(provided_token) else {
return Err(ApiError::unauthorized("invalid bearer token"));
};
request.extensions_mut().insert(actor);
Ok(next.run(request).await)
}
pub(crate) async fn resolve_graph_handle(
State(state): State<AppState>,
mut request: Request,
next: Next,
) -> std::result::Result<Response, ApiError> {
let registry = &state.routing.registry;
let original_path: String = request
.extensions()
.get::<OriginalUri>()
.map(|OriginalUri(uri)| uri.path().to_string())
.unwrap_or_else(|| request.uri().path().to_string());
let graph_id_str = original_path
.strip_prefix("/graphs/")
.and_then(|rest| rest.split('/').next())
.filter(|s| !s.is_empty())
.ok_or_else(|| {
ApiError::bad_request("cluster route missing /graphs/{graph_id} prefix".to_string())
})?;
let graph_id = GraphId::try_from(graph_id_str.to_string())
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let key = GraphKey::cluster(graph_id.clone());
let handle = match registry.get(&key) {
RegistryLookup::Ready(handle) => handle,
RegistryLookup::Gone => {
return Err(ApiError::not_found(format!("graph '{graph_id}' not found")));
}
};
info!(graph_id = %handle.key.graph_id, "graph routed");
request.extensions_mut().insert(handle);
Ok(next.run(request).await)
}
pub(crate) fn log_policy_decision(actor_id: &str, request: &PolicyRequest, decision: &PolicyDecision) {
info!(
actor_id = actor_id,
action = %request.action,
branch = request.branch.as_deref().unwrap_or(""),
target_branch = request.target_branch.as_deref().unwrap_or(""),
allowed = decision.allowed,
matched_rule_id = decision.matched_rule_id.as_deref().unwrap_or(""),
"policy decision"
);
}
pub(crate) enum Authz {
Allowed,
Denied(String),
}
pub(crate) fn authorize(
actor: Option<&ResolvedActor>,
policy: Option<&PolicyEngine>,
request: PolicyRequest,
) -> std::result::Result<Authz, ApiError> {
let Some(engine) = policy else {
if request.action.resource_kind() == PolicyResourceKind::Server {
return Ok(Authz::Denied(
"server-scoped actions require an explicit cluster policy bundle \
applied with `omnigraph cluster apply` and served after restart — \
the management surface is closed by default in every runtime state, \
including --unauthenticated, so that server topology is never exposed \
without operator opt-in."
.to_string(),
));
}
if actor.is_some() && request.action != PolicyAction::Read {
return Ok(Authz::Denied(
"server runs in default-deny mode (bearer tokens configured but no \
applied policy bundle). Only `read` actions are permitted; configure \
a graph or cluster policy bundle in the cluster config, run \
`omnigraph cluster apply`, and restart the server to enable other actions."
.to_string(),
));
}
return Ok(Authz::Allowed);
};
let Some(actor) = actor else {
return Err(ApiError::unauthorized("missing bearer token"));
};
let actor_id = actor.actor_id.as_ref();
let decision = engine
.authorize(actor_id, &request)
.map_err(|err| ApiError::internal(format!("policy: {err}")))?;
log_policy_decision(actor_id, &request, &decision);
if decision.allowed {
Ok(Authz::Allowed)
} else {
Ok(Authz::Denied(decision.message))
}
}
pub(crate) fn authorize_request(
actor: Option<&ResolvedActor>,
policy: Option<&PolicyEngine>,
request: PolicyRequest,
) -> std::result::Result<(), ApiError> {
match authorize(actor, policy, request)? {
Authz::Allowed => Ok(()),
Authz::Denied(message) => Err(ApiError::forbidden(message)),
}
}
#[utoipa::path(
get,
path = "/snapshot",
tag = "snapshots",
operation_id = "getSnapshot",
params(SnapshotQuery),
responses(
(status = 200, description = "Database snapshot", body = api::SnapshotOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_snapshot(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Query(query): Query<SnapshotQuery>,
) -> std::result::Result<Json<api::SnapshotOutput>, ApiError> {
let branch = query.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let snapshot = {
let db = &handle.engine;
db.snapshot_of(ReadTarget::branch(branch.as_str()))
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(snapshot_payload(&branch, &snapshot)))
}
pub(crate) fn deprecation_headers(successor_link: &'static str) -> [(HeaderName, HeaderValue); 2] {
[
(
HeaderName::from_static("deprecation"),
HeaderValue::from_static("true"),
),
(
HeaderName::from_static("link"),
HeaderValue::from_static(successor_link),
),
]
}
#[utoipa::path(
post,
path = "/read",
tag = "queries",
operation_id = "read",
request_body = ReadRequest,
responses(
(status = 200, description = "Query results (response includes `Deprecation: true` + `Link: <query>; rel=\"successor-version\"`)", body = ReadOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /query instead; /read is kept indefinitely for byte-stable back-compat")]
pub(crate) async fn server_read(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ReadRequest>,
) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ReadOutput>), ApiError> {
let (selected_name, target, result) = run_query(
handle,
actor.as_ref().map(|Extension(actor)| actor),
&request.query_source,
request.query_name.as_deref(),
request.params.as_ref(),
request.branch,
request.snapshot,
false, )
.await?;
Ok((
deprecation_headers("<query>; rel=\"successor-version\""),
Json(api::read_output(selected_name, &target, result)),
))
}
#[utoipa::path(
post,
path = "/query",
tag = "queries",
operation_id = "query",
request_body = QueryRequest,
responses(
(status = 200, description = "Query results", body = ReadOutput),
(status = 400, description = "Bad request - also returned when the query body contains mutations; use POST /mutate (or its deprecated alias POST /change) for write queries", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_query(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<QueryRequest>,
) -> std::result::Result<Json<ReadOutput>, ApiError> {
let (selected_name, target, result) = run_query(
handle,
actor.as_ref().map(|Extension(actor)| actor),
&request.query,
request.name.as_deref(),
request.params.as_ref(),
request.branch,
request.snapshot,
true, )
.await?;
Ok(Json(api::read_output(selected_name, &target, result)))
}
#[utoipa::path(
post,
path = "/export",
tag = "queries",
operation_id = "export",
request_body = ExportRequest,
responses(
(status = 200, description = "Exported data as NDJSON", content_type = "application/x-ndjson"),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_export(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ExportRequest>,
) -> std::result::Result<Response, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Export,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let engine = Arc::clone(&handle.engine);
let type_names = request.type_names.clone();
let table_keys = request.table_keys.clone();
let (tx, rx) = mpsc::unbounded_channel::<std::result::Result<Bytes, io::Error>>();
tokio::spawn(async move {
let result = {
let mut writer = ExportStreamWriter { sender: tx.clone() };
engine
.export_jsonl_to_writer(&branch, &type_names, &table_keys, &mut writer)
.await
};
if let Err(err) = result {
let _ = tx.send(Err(io::Error::other(err.to_string())));
}
});
let body = Body::from_stream(stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
}));
Ok((
StatusCode::OK,
[(CONTENT_TYPE, "application/x-ndjson; charset=utf-8")],
body,
)
.into_response())
}
pub(crate) async fn run_mutate(
state: AppState,
handle: Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
query: &str,
name: Option<&str>,
params_json: Option<&Value>,
branch: String,
) -> std::result::Result<ChangeOutput, ApiError> {
let actor_arc = actor
.map(|a| Arc::clone(&a.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.map(|a| a.actor_id.as_ref());
authorize_request(
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Change,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let est_bytes = query.len() as u64
+ params_json
.map(|p| p.to_string().len() as u64)
.unwrap_or(0);
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let (selected_name, query_params) =
select_named_query(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
let params = query_params_from_json(&query_params, params_json)
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let db = &handle.engine;
db.mutate_as(&branch, query, &selected_name, ¶ms, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(ChangeOutput {
branch,
query_name: selected_name,
affected_nodes: result.affected_nodes,
affected_edges: result.affected_edges,
actor_id: actor_id.map(str::to_string),
})
}
pub(crate) async fn run_query(
handle: Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
query: &str,
name: Option<&str>,
params_json: Option<&Value>,
branch: Option<String>,
snapshot: Option<String>,
reject_mutations: bool,
) -> std::result::Result<(String, ReadTarget, omnigraph_compiler::result::QueryResult), ApiError> {
if branch.is_some() && snapshot.is_some() {
return Err(ApiError::bad_request(
"request may specify branch or snapshot, not both",
));
}
let target = read_target_from_request(branch, snapshot);
let policy_branch = match &target {
ReadTarget::Branch(branch) => Some(branch.clone()),
ReadTarget::Snapshot(_) if handle.policy.is_some() && actor.is_some() => {
let db = &handle.engine;
db.resolved_branch_of(target.clone())
.await
.map(|branch| branch.or_else(|| Some("main".to_string())))
.map_err(ApiError::from_omni)?
}
ReadTarget::Snapshot(_) => None,
};
authorize_request(
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: policy_branch,
target_branch: None,
},
)?;
let query_decl =
select_named_query_decl(query, name).map_err(|err| ApiError::bad_request(err.to_string()))?;
if reject_mutations && !query_decl.mutations.is_empty() {
return Err(ApiError::bad_request(format!(
"query '{}' contains mutations (insert/update/delete); use POST /mutate for write queries",
query_decl.name
)));
}
let selected_name = query_decl.name.clone();
let params = query_params_from_json(&query_decl.params, params_json)
.map_err(|err| ApiError::bad_request(err.to_string()))?;
let result = {
let db = &handle.engine;
db.query(target.clone(), query, &selected_name, ¶ms)
.await
.map_err(ApiError::from_omni)?
};
Ok((selected_name, target, result))
}
#[utoipa::path(
post,
path = "/change",
tag = "mutations",
operation_id = "change",
request_body = ChangeRequest,
responses(
(status = 200, description = "Mutation results (response includes `Deprecation: true` + `Link: <mutate>; rel=\"successor-version\"`)", body = ChangeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /mutate instead; /change is kept indefinitely for back-compat")]
pub(crate) async fn server_change(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ChangeRequest>,
) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<ChangeOutput>), ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let output = run_mutate(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
&request.query,
request.name.as_deref(),
request.params.as_ref(),
branch,
)
.await?;
Ok((
deprecation_headers("<mutate>; rel=\"successor-version\""),
Json(output),
))
}
#[utoipa::path(
post,
path = "/mutate",
tag = "mutations",
operation_id = "mutate",
request_body = ChangeRequest,
responses(
(status = 200, description = "Mutation results", body = ChangeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_mutate(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<ChangeRequest>,
) -> std::result::Result<Json<ChangeOutput>, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
Ok(Json(
run_mutate(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
&request.query,
request.name.as_deref(),
request.params.as_ref(),
branch,
)
.await?,
))
}
#[derive(Deserialize)]
pub(crate) struct QueryNamePath {
name: String,
}
pub(crate) fn parse_optional_invoke_body(
body: Bytes,
) -> std::result::Result<InvokeStoredQueryRequest, ApiError> {
if body.is_empty() {
return Ok(InvokeStoredQueryRequest::default());
}
serde_json::from_slice::<Option<InvokeStoredQueryRequest>>(&body)
.map(|request| request.unwrap_or_default())
.map_err(|err| {
ApiError::bad_request(format!("invalid stored-query invocation body: {err}"))
})
}
#[utoipa::path(
post,
path = "/queries/{name}",
tag = "queries",
operation_id = "invoke_query",
params(("name" = String, Path, description = "Stored query name (the registry key)")),
request_body = Option<InvokeStoredQueryRequest>,
responses(
(status = 200, description = "Read envelope (ReadOutput) or mutation envelope (ChangeOutput), serialized untagged", body = InvokeStoredQueryResponse),
(status = 400, description = "Bad request (param type error; snapshot on a stored mutation)", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden (the inner `change` gate for a stored mutation)", body = ErrorOutput),
(status = 404, description = "Unknown stored query, or `invoke_query` denied — indistinguishable to a caller without the grant", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
(status = 500, description = "Policy evaluation error (a denial is reported as 404, not 500)", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_invoke_query(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(QueryNamePath { name }): Path<QueryNamePath>,
body: Bytes,
) -> std::result::Result<Json<InvokeStoredQueryResponse>, ApiError> {
let req = parse_optional_invoke_body(body)?;
const NOT_FOUND: &str = "stored query not found";
let actor_ref = actor.as_ref().map(|Extension(actor)| actor);
match authorize(
actor_ref,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::InvokeQuery,
branch: None,
target_branch: None,
},
)? {
Authz::Allowed => {}
Authz::Denied(_) => return Err(ApiError::not_found(NOT_FOUND)),
}
let stored = handle
.queries
.as_ref()
.and_then(|registry| registry.lookup(&name))
.ok_or_else(|| ApiError::not_found(NOT_FOUND))?;
let source = Arc::clone(&stored.source);
let query_name = stored.name.clone();
let is_mutation = stored.is_mutation();
if let Some(expected) = req.expect_mutation {
if expected != is_mutation {
let (actual, verb) = if is_mutation {
("mutation", "mutate")
} else {
("read", "query")
};
return Err(ApiError::bad_request(format!(
"'{query_name}' is a {actual} — use omnigraph {verb} {query_name}"
)));
}
}
info!(
graph = %handle.uri,
actor = ?actor_ref.map(|a| a.actor_id.as_ref()),
query = %query_name,
kind = if is_mutation { "mutate" } else { "read" },
"stored query invoked"
);
if is_mutation {
if req.snapshot.is_some() {
return Err(ApiError::bad_request(
"stored mutation cannot target a snapshot",
));
}
let branch = req.branch.unwrap_or_else(|| "main".to_string());
let output = run_mutate(
state,
handle,
actor_ref,
&source,
Some(&query_name),
req.params.as_ref(),
branch,
)
.await?;
Ok(Json(InvokeStoredQueryResponse::Change(output)))
} else {
let (selected, target, result) = run_query(
handle,
actor_ref,
&source,
Some(&query_name),
req.params.as_ref(),
req.branch,
req.snapshot,
true,
)
.await?;
Ok(Json(InvokeStoredQueryResponse::Read(api::read_output(
selected, &target, result,
))))
}
}
#[utoipa::path(
get,
path = "/queries",
tag = "queries",
operation_id = "list_queries",
responses(
(status = 200, description = "Stored-query catalog (the mcp.expose subset, with typed params)", body = QueriesCatalogOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_list_queries(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<QueriesCatalogOutput>, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: Some("main".to_string()),
target_branch: None,
},
)?;
let queries = match handle.queries.as_ref() {
Some(registry) => registry
.iter()
.filter(|q| q.expose)
.map(api::query_catalog_entry)
.collect(),
None => Vec::new(),
};
Ok(Json(QueriesCatalogOutput { queries }))
}
#[utoipa::path(
get,
path = "/schema",
tag = "schema",
operation_id = "getSchema",
responses(
(status = 200, description = "Current schema source", body = SchemaOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_schema_get(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<SchemaOutput>, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let schema_source = {
let db = &handle.engine;
db.schema_source().to_string()
};
Ok(Json(SchemaOutput { schema_source }))
}
#[utoipa::path(
post,
path = "/schema/apply",
tag = "mutations",
operation_id = "applySchema",
request_body = SchemaApplyRequest,
responses(
(status = 200, description = "Schema apply results", body = SchemaApplyOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Schema apply is disabled for cluster-backed serving; use `omnigraph cluster apply` and restart", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_schema_apply(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<SchemaApplyRequest>,
) -> std::result::Result<Json<SchemaApplyOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::SchemaApply,
branch: None,
target_branch: Some("main".to_string()),
},
)?;
if state.routing().config_path.is_some() {
return Err(ApiError::conflict(
"server-side schema apply is disabled for cluster-backed serving; \
update the cluster config, run `omnigraph cluster apply`, and restart \
the server.",
));
}
let est_bytes = request.schema_source.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &handle.engine;
let registry = handle.queries.as_deref();
let label = handle.key.graph_id.as_str().to_string();
db.apply_schema_as_with_catalog_check(
&request.schema_source,
omnigraph::db::SchemaApplyOptions {
allow_data_loss: request.allow_data_loss,
},
actor_id,
|catalog| {
if let Some(registry) = registry {
validate_registry_against_catalog(registry, catalog, &label)?;
}
Ok(())
},
)
.await
.map_err(ApiError::from_omni)?
};
if result.applied {
let engine = Arc::clone(&handle.engine);
tokio::spawn(async move {
if let Err(err) = engine.ensure_indices().await {
tracing::warn!(
target: "omnigraph::server",
error = %err,
"post-apply ensure_indices failed; indexes will converge on the next optimize",
);
}
});
}
Ok(Json(schema_apply_output(handle.uri.as_str(), result)))
}
async fn run_ingest(
state: AppState,
handle: Arc<GraphHandle>,
actor: Option<&ResolvedActor>,
request: IngestRequest,
) -> std::result::Result<IngestOutput, ApiError> {
let branch = request.branch.unwrap_or_else(|| "main".to_string());
let from = request.from;
let mode = request.mode.unwrap_or(omnigraph::loader::LoadMode::Merge);
let actor_arc = actor
.map(|actor| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor.map(|actor| actor.actor_id.as_ref());
let branch_exists = {
let db = &handle.engine;
db.branch_list()
.await
.map_err(ApiError::from_omni)?
.into_iter()
.any(|name| name == branch)
};
if !branch_exists {
match from.as_deref() {
None => {
return Err(ApiError::not_found(format!(
"branch '{branch}' not found; pass `from` to create it"
)));
}
Some(from) => authorize_request(
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.to_string()),
target_branch: Some(branch.clone()),
},
)?,
}
}
authorize_request(
actor,
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Change,
branch: Some(branch.clone()),
target_branch: None,
},
)?;
let est_bytes = request.data.len() as u64;
let _admission = state
.workload
.try_admit(&actor_arc, est_bytes)
.map_err(ApiError::from_workload_reject)?;
let result = {
let db = &handle.engine;
db.load_as(&branch, from.as_deref(), &request.data, mode, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(ingest_output(
handle.uri.as_str(),
&result,
mode,
actor_id.map(str::to_string),
))
}
#[utoipa::path(
post,
path = "/load",
tag = "mutations",
operation_id = "load",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_load(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<Json<IngestOutput>, ApiError> {
Ok(Json(
run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?,
))
}
#[utoipa::path(
post,
path = "/ingest",
tag = "mutations",
operation_id = "ingest",
request_body = IngestRequest,
responses(
(status = 200, description = "Load results (response includes `Deprecation: true` + `Link: <load>; rel=\"successor-version\"`)", body = IngestOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
#[deprecated(note = "use POST /load instead; /ingest is kept indefinitely for back-compat")]
pub(crate) async fn server_ingest(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<IngestRequest>,
) -> std::result::Result<([(HeaderName, HeaderValue); 2], Json<IngestOutput>), ApiError> {
let output = run_ingest(
state,
handle,
actor.as_ref().map(|Extension(actor)| actor),
request,
)
.await?;
Ok((
deprecation_headers("<load>; rel=\"successor-version\""),
Json(output),
))
}
#[utoipa::path(
get,
path = "/branches",
tag = "branches",
operation_id = "listBranches",
responses(
(status = 200, description = "List of branches", body = BranchListOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_branch_list(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
) -> std::result::Result<Json<BranchListOutput>, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let mut branches = {
let db = &handle.engine;
db.branch_list().await.map_err(ApiError::from_omni)?
};
branches.sort();
Ok(Json(BranchListOutput { branches }))
}
#[utoipa::path(
post,
path = "/branches",
tag = "branches",
operation_id = "createBranch",
request_body = BranchCreateRequest,
responses(
(status = 200, description = "Branch created", body = BranchCreateOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Branch already exists", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_branch_create(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<BranchCreateRequest>,
) -> std::result::Result<Json<BranchCreateOutput>, ApiError> {
let from = request.from.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchCreate,
branch: Some(from.clone()),
target_branch: Some(request.name.clone()),
},
)?;
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &handle.engine;
db.branch_create_from_as(
ReadTarget::branch(&from),
&request.name,
actor.as_ref().map(|Extension(a)| a.actor_id.as_ref()),
)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchCreateOutput {
uri: handle.uri.clone(),
from,
name: request.name,
actor_id: actor.map(|Extension(actor)| actor.actor_id.as_ref().to_string()),
}))
}
#[derive(Deserialize)]
pub(crate) struct BranchPath {
branch: String,
}
#[utoipa::path(
delete,
path = "/branches/{branch}",
tag = "branches",
operation_id = "deleteBranch",
params(
("branch" = String, Path, description = "Branch name to delete"),
),
responses(
(status = 200, description = "Branch deleted", body = BranchDeleteOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 404, description = "Branch not found", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_branch_delete(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(BranchPath { branch }): Path<BranchPath>,
) -> std::result::Result<Json<BranchDeleteOutput>, ApiError> {
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchDelete,
branch: None,
target_branch: Some(branch.clone()),
},
)?;
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
{
let db = &handle.engine;
db.branch_delete_as(&branch, actor_id)
.await
.map_err(ApiError::from_omni)?;
}
Ok(Json(BranchDeleteOutput {
uri: handle.uri.clone(),
name: branch,
actor_id: actor_id.map(str::to_string),
}))
}
#[utoipa::path(
post,
path = "/branches/merge",
tag = "branches",
operation_id = "mergeBranches",
request_body = BranchMergeRequest,
responses(
(status = 200, description = "Branches merged", body = BranchMergeOutput),
(status = 400, description = "Bad request", body = ErrorOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 409, description = "Merge conflict", body = ErrorOutput),
(status = 429, description = "Per-actor admission cap exceeded; honor `Retry-After` header", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_branch_merge(
State(state): State<AppState>,
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Json(request): Json<BranchMergeRequest>,
) -> std::result::Result<Json<BranchMergeOutput>, ApiError> {
let target = request.target.unwrap_or_else(|| "main".to_string());
let actor_arc = actor
.as_ref()
.map(|Extension(actor)| Arc::clone(&actor.actor_id))
.unwrap_or_else(|| Arc::<str>::from("anonymous"));
let actor_id = actor
.as_ref()
.map(|Extension(actor)| actor.actor_id.as_ref());
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::BranchMerge,
branch: Some(request.source.clone()),
target_branch: Some(target.clone()),
},
)?;
let _admission = state
.workload
.try_admit(&actor_arc, 256)
.map_err(ApiError::from_workload_reject)?;
let outcome = {
let db = &handle.engine;
db.branch_merge_as(&request.source, &target, actor_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(BranchMergeOutput {
source: request.source,
target,
outcome: outcome.into(),
actor_id: actor_id.map(str::to_string),
}))
}
#[utoipa::path(
get,
path = "/commits",
tag = "commits",
operation_id = "listCommits",
params(CommitListQuery),
responses(
(status = 200, description = "List of commits", body = CommitListOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_commit_list(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Query(query): Query<CommitListQuery>,
) -> std::result::Result<Json<CommitListOutput>, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: query.branch.clone(),
target_branch: None,
},
)?;
let commits = {
let db = &handle.engine;
db.list_commits(query.branch.as_deref())
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(CommitListOutput {
commits: commits.iter().map(api::commit_output).collect(),
}))
}
#[derive(Deserialize)]
pub(crate) struct CommitPath {
commit_id: String,
}
#[utoipa::path(
get,
path = "/commits/{commit_id}",
tag = "commits",
operation_id = "getCommit",
params(
("commit_id" = String, Path, description = "Commit identifier"),
),
responses(
(status = 200, description = "Commit details", body = api::CommitOutput),
(status = 401, description = "Unauthorized", body = ErrorOutput),
(status = 403, description = "Forbidden", body = ErrorOutput),
(status = 404, description = "Commit not found", body = ErrorOutput),
),
security(("bearer_token" = [])),
)]
pub(crate) async fn server_commit_show(
Extension(handle): Extension<Arc<GraphHandle>>,
actor: Option<Extension<ResolvedActor>>,
Path(CommitPath { commit_id }): Path<CommitPath>,
) -> std::result::Result<Json<api::CommitOutput>, ApiError> {
authorize_request(
actor.as_ref().map(|Extension(actor)| actor),
handle.policy.as_deref(),
PolicyRequest {
action: PolicyAction::Read,
branch: None,
target_branch: None,
},
)?;
let commit = {
let db = &handle.engine;
db.get_commit(&commit_id)
.await
.map_err(ApiError::from_omni)?
};
Ok(Json(api::commit_output(&commit)))
}
pub(crate) fn read_target_from_request(branch: Option<String>, snapshot: Option<String>) -> ReadTarget {
if let Some(snapshot) = snapshot {
ReadTarget::snapshot(omnigraph::db::SnapshotId::new(snapshot))
} else {
ReadTarget::branch(branch.unwrap_or_else(|| "main".to_string()))
}
}
pub(crate) fn select_named_query_decl(
query_source: &str,
requested_name: Option<&str>,
) -> Result<omnigraph_compiler::query::ast::QueryDecl> {
let parsed = parse_query(query_source)?;
let query = if let Some(name) = requested_name {
parsed
.queries
.into_iter()
.find(|query| query.name == name)
.ok_or_else(|| color_eyre::eyre::eyre!("query '{}' not found", name))?
} else if parsed.queries.len() == 1 {
parsed.queries.into_iter().next().unwrap()
} else {
bail!("query file contains multiple queries; pass --name");
};
Ok(query)
}
pub(crate) fn select_named_query(
query_source: &str,
requested_name: Option<&str>,
) -> Result<(String, Vec<omnigraph_compiler::query::ast::Param>)> {
let query = select_named_query_decl(query_source, requested_name)?;
Ok((query.name, query.params))
}
pub(crate) fn query_params_from_json(
query_params: &[omnigraph_compiler::query::ast::Param],
params_json: Option<&Value>,
) -> Result<ParamMap> {
json_params_to_param_map(params_json, query_params, JsonParamMode::Standard)
.map_err(|err| color_eyre::eyre::eyre!(err.to_string()))
}