use std::io::Write;
use color_eyre::Result;
use color_eyre::eyre::bail;
use omnigraph::db::{Omnigraph, ReadTarget};
use omnigraph_api_types::{
BranchCreateOutput, BranchCreateRequest, BranchDeleteOutput, BranchListOutput,
BranchMergeOutput, BranchMergeRequest, ChangeOutput, CommitListOutput, CommitOutput,
ErrorOutput, ExportRequest, GraphListResponse, IngestOutput, IngestRequest,
InvokeStoredQueryRequest, ReadOutput,
ReadRequest, SchemaApplyOutput, SchemaApplyRequest, SchemaOutput, SnapshotOutput, commit_output,
ingest_output, read_output, schema_apply_output, snapshot_payload,
};
use omnigraph_compiler::catalog::Catalog;
use reqwest::Method;
use serde_json::Value;
use crate::cli::CliLoadMode;
use crate::helpers::{
apply_bearer_token, apply_server_flag, build_http_client, is_remote_uri,
legacy_change_request_body, query_params_from_json,
remote_json, remote_url, resolve_cli_actor, resolve_cli_graph, resolve_remote_bearer_token,
resolve_server_flag, select_named_query,
};
use crate::output::{LoadOutput, load_output_from_result, load_output_from_tables};
pub(crate) enum GraphClient {
Embedded {
uri: String,
actor: Option<String>,
},
Remote {
http: reqwest::Client,
base_url: String,
token: Option<String>,
},
}
async fn require_graph_for_multi_graph_server(
scope: &crate::scope::ResolvedScope,
) -> Result<()> {
let (Some(server), None) = (scope.server.as_deref(), scope.graph.as_deref()) else {
return Ok(());
};
let Some(base) = resolve_server_flag(Some(server), None)? else {
return Ok(());
};
let token = resolve_remote_bearer_token(Some(&base))?;
let probe = GraphClient::Remote {
http: build_http_client()?,
base_url: base,
token,
};
if let Ok(resp) = probe.list_graphs().await {
if !resp.graphs.is_empty() {
let ids: Vec<&str> = resp.graphs.iter().map(|g| g.graph_id.as_str()).collect();
bail!(
"server scope '{server}' has {} {}: [{}]; pass --graph <id> to select one \
(or set `default_graph` in your operator config)",
ids.len(),
if ids.len() == 1 { "graph" } else { "graphs" },
ids.join(", ")
);
}
}
Ok(())
}
fn reject_positional_remote(via_server: bool, uri: &str) -> Result<()> {
if !via_server && is_remote_uri(uri) {
bail!(
"a remote graph must be addressed with `--server <url>` — a positional \
(or `--uri`) http(s):// URL no longer dispatches to a server"
);
}
Ok(())
}
impl GraphClient {
pub(crate) async fn resolve(
server: Option<&str>,
graph: Option<&str>,
uri: Option<String>,
profile: Option<&str>,
store: Option<&str>,
) -> Result<Self> {
let scope = crate::scope::resolve_scope(
&crate::operator::load_operator_config()?,
crate::planes::Capability::Any,
crate::scope::ScopeFlags { profile, store, server, cluster: None, graph, uri },
)?;
require_graph_for_multi_graph_server(&scope).await?;
let (server, graph, uri) = (
scope.server.as_deref(),
scope.graph.as_deref(),
scope.uri,
);
let via_server = server.is_some();
let uri = apply_server_flag(server, graph, uri)?;
let token = resolve_remote_bearer_token(uri.as_deref())?;
let uri = crate::helpers::resolve_uri(uri)?;
reject_positional_remote(via_server, &uri)?;
if is_remote_uri(&uri) {
Ok(GraphClient::Remote {
http: build_http_client()?,
base_url: uri,
token,
})
} else {
Ok(GraphClient::Embedded { uri, actor: None })
}
}
pub(crate) async fn resolve_with_policy(
server: Option<&str>,
graph: Option<&str>,
uri: Option<String>,
cli_as: Option<&str>,
profile: Option<&str>,
store: Option<&str>,
) -> Result<Self> {
let scope = crate::scope::resolve_scope(
&crate::operator::load_operator_config()?,
crate::planes::Capability::Any,
crate::scope::ScopeFlags { profile, store, server, cluster: None, graph, uri },
)?;
require_graph_for_multi_graph_server(&scope).await?;
let (server, graph, uri) = (
scope.server.as_deref(),
scope.graph.as_deref(),
scope.uri,
);
let via_server = server.is_some();
let uri = apply_server_flag(server, graph, uri)?;
let token = resolve_remote_bearer_token(uri.as_deref())?;
let resolved = resolve_cli_graph(uri)?;
reject_positional_remote(via_server, &resolved.uri)?;
if resolved.is_remote {
if cli_as.is_some() {
bail!(
"`--as` is not allowed on a served write — the server resolves the actor \
from the bearer token. Remove `--as`, or run the write directly against \
storage with `--store <uri>`."
);
}
Ok(GraphClient::Remote {
http: build_http_client()?,
base_url: resolved.uri,
token,
})
} else {
let actor = resolve_cli_actor(cli_as)?;
Ok(GraphClient::Embedded {
uri: resolved.uri,
actor,
})
}
}
pub(crate) fn uri(&self) -> &str {
match self {
GraphClient::Embedded { uri, .. } => uri,
GraphClient::Remote { base_url, .. } => base_url,
}
}
pub(crate) fn is_remote(&self) -> bool {
matches!(self, GraphClient::Remote { .. })
}
async fn open_embedded(uri: &str) -> Result<Omnigraph> {
Ok(Omnigraph::open(uri).await?)
}
pub(crate) async fn branch_list(&self) -> Result<BranchListOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, &["branches"], &[])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
let mut branches = db.branch_list().await?;
branches.sort();
Ok(BranchListOutput { branches })
}
}
}
pub(crate) async fn snapshot(&self, branch: &str) -> Result<SnapshotOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, &["snapshot"], &[("branch", branch)])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
let snapshot = db.snapshot_of(ReadTarget::branch(branch)).await?;
Ok(snapshot_payload(branch, &snapshot))
}
}
}
pub(crate) async fn schema_source(&self) -> Result<SchemaOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, &["schema"], &[])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
Ok(SchemaOutput {
schema_source: db.schema_source().to_string(),
})
}
}
}
pub(crate) async fn list_commits(&self, branch: Option<&str>) -> Result<CommitListOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let url = match branch {
Some(branch) => remote_url(base_url, &["commits"], &[("branch", branch)])?,
None => remote_url(base_url, &["commits"], &[])?,
};
remote_json(http, Method::GET, url, None, token.as_deref()).await
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
let commits = db
.list_commits(branch)
.await?
.iter()
.map(commit_output)
.collect::<Vec<_>>();
Ok(CommitListOutput { commits })
}
}
}
pub(crate) async fn get_commit(&self, commit_id: &str) -> Result<CommitOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, &["commits", commit_id], &[])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
Ok(commit_output(&db.get_commit(commit_id).await?))
}
}
}
pub(crate) async fn load(
&self,
branch: &str,
from: Option<&str>,
data: &str,
mode: CliLoadMode,
) -> Result<LoadOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let data = std::fs::read_to_string(data)?;
let output = remote_json::<IngestOutput>(
http,
Method::POST,
remote_url(base_url, &["load"], &[])?,
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.to_string()),
from: from.map(ToOwned::to_owned),
mode: Some(mode.into()),
data,
})?),
token.as_deref(),
)
.await?;
Ok(load_output_from_tables(base_url, branch, mode.as_str(), &output))
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let result = db
.load_file_as(branch, from, data, mode.into(), actor.as_deref())
.await?;
Ok(load_output_from_result(uri, branch, mode.as_str(), &result))
}
}
}
pub(crate) async fn ingest(
&self,
branch: &str,
from: &str,
data: &str,
mode: CliLoadMode,
) -> Result<IngestOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let data = std::fs::read_to_string(data)?;
remote_json(
http,
Method::POST,
remote_url(base_url, &["ingest"], &[])?,
Some(serde_json::to_value(IngestRequest {
branch: Some(branch.to_string()),
from: Some(from.to_string()),
mode: Some(mode.into()),
data,
})?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let result = db
.load_file_as(branch, Some(from), data, mode.into(), actor.as_deref())
.await?;
Ok(ingest_output(uri, &result, mode.into(), None))
}
}
}
pub(crate) async fn mutate(
&self,
branch: &str,
query_source: &str,
query_name: Option<&str>,
params_json: Option<&Value>,
) -> Result<ChangeOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::POST,
remote_url(base_url, &["change"], &[])?,
Some(legacy_change_request_body(
query_source,
query_name,
branch,
params_json,
)),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let (selected_name, query_params) = select_named_query(query_source, query_name)?;
let params = query_params_from_json(&query_params, params_json)?;
let db = Self::open_embedded(uri).await?;
let actor = actor.as_deref();
let result = db
.mutate_as(branch, query_source, &selected_name, ¶ms, actor)
.await?;
Ok(ChangeOutput {
branch: branch.to_string(),
query_name: selected_name,
affected_nodes: result.affected_nodes,
affected_edges: result.affected_edges,
actor_id: actor.map(String::from),
})
}
}
}
pub(crate) async fn query(
&self,
target: ReadTarget,
query_source: &str,
query_name: Option<&str>,
params_json: Option<&Value>,
) -> Result<ReadOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let (branch, snapshot) = match &target {
ReadTarget::Branch(branch) => (Some(branch.clone()), None),
ReadTarget::Snapshot(snapshot) => (None, Some(snapshot.as_str().to_string())),
};
remote_json(
http,
Method::POST,
remote_url(base_url, &["read"], &[])?,
Some(serde_json::to_value(ReadRequest {
query_source: query_source.to_string(),
query_name: query_name.map(ToOwned::to_owned),
params: params_json.cloned(),
branch,
snapshot,
})?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, .. } => {
let (selected_name, query_params) = select_named_query(query_source, query_name)?;
let params = query_params_from_json(&query_params, params_json)?;
let db = Self::open_embedded(uri).await?;
let result = db
.query(target.clone(), query_source, &selected_name, ¶ms)
.await?;
Ok(read_output(selected_name, &target, result))
}
}
}
pub(crate) async fn invoke_named<T: serde::de::DeserializeOwned>(
&self,
name: &str,
expect_mutation: bool,
params_json: Option<&Value>,
branch: Option<String>,
snapshot: Option<String>,
) -> Result<T> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let body = InvokeStoredQueryRequest {
params: params_json.cloned(),
branch,
snapshot,
expect_mutation: Some(expect_mutation),
};
remote_json(
http,
Method::POST,
remote_url(base_url, &["queries", name], &[])?,
Some(serde_json::to_value(body)?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { .. } => bail!(
"by-name invocation needs a server (the stored-query catalog is \
server-owned); use -e '<gq>' or --query <file> for an ad-hoc query \
against --store, or address a server with --server / --profile"
),
}
}
pub(crate) async fn branch_create_from(
&self,
from: &str,
name: &str,
) -> Result<BranchCreateOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::POST,
remote_url(base_url, &["branches"], &[])?,
Some(serde_json::to_value(BranchCreateRequest {
from: Some(from.to_string()),
name: name.to_string(),
})?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let actor = actor.as_deref();
db.branch_create_from_as(ReadTarget::branch(from), name, actor)
.await?;
Ok(BranchCreateOutput {
uri: uri.clone(),
from: from.to_string(),
name: name.to_string(),
actor_id: actor.map(String::from),
})
}
}
}
pub(crate) async fn branch_delete(&self, name: &str) -> Result<BranchDeleteOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::DELETE,
remote_url(base_url, &["branches", name], &[])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let actor = actor.as_deref();
db.branch_delete_as(name, actor).await?;
Ok(BranchDeleteOutput {
uri: uri.clone(),
name: name.to_string(),
actor_id: actor.map(String::from),
})
}
}
}
pub(crate) async fn branch_merge(&self, source: &str, into: &str) -> Result<BranchMergeOutput> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::POST,
remote_url(base_url, &["branches", "merge"], &[])?,
Some(serde_json::to_value(BranchMergeRequest {
source: source.to_string(),
target: Some(into.to_string()),
})?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let actor = actor.as_deref();
let outcome = db.branch_merge_as(source, into, actor).await?;
Ok(BranchMergeOutput {
source: source.to_string(),
target: into.to_string(),
outcome: outcome.into(),
actor_id: actor.map(String::from),
})
}
}
}
pub(crate) async fn apply_schema<F>(
&self,
schema_source: &str,
allow_data_loss: bool,
validate: F,
) -> Result<SchemaApplyOutput>
where
F: FnOnce(&Catalog) -> omnigraph::error::Result<()>,
{
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json::<SchemaApplyOutput>(
http,
Method::POST,
remote_url(base_url, &["schema", "apply"], &[])?,
Some(serde_json::to_value(SchemaApplyRequest {
schema_source: schema_source.to_string(),
allow_data_loss,
})?),
token.as_deref(),
)
.await
}
GraphClient::Embedded { uri, actor } => {
let db = Self::open_embedded(uri).await?;
let result = db
.apply_schema_as_with_catalog_check(
schema_source,
omnigraph::db::SchemaApplyOptions { allow_data_loss },
actor.as_deref(),
validate,
)
.await?;
Ok(schema_apply_output(uri, result))
}
}
}
pub(crate) async fn export<W: Write>(
&self,
branch: &str,
type_names: &[String],
table_keys: &[String],
writer: &mut W,
) -> Result<()> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
let request = apply_bearer_token(
http.request(Method::POST, remote_url(base_url, &["export"], &[])?),
token.as_deref(),
)
.json(&ExportRequest {
branch: Some(branch.to_string()),
type_names: type_names.to_vec(),
table_keys: table_keys.to_vec(),
});
let mut response = request.send().await?;
let status = response.status();
if !status.is_success() {
let text = response.text().await?;
if let Ok(error) = serde_json::from_str::<ErrorOutput>(&text) {
bail!(error.error);
}
bail!("server returned {}: {}", status, text);
}
while let Some(chunk) = response.chunk().await? {
writer.write_all(&chunk)?;
}
writer.flush()?;
Ok(())
}
GraphClient::Embedded { uri, .. } => {
let db = Omnigraph::open(uri).await?;
db.export_jsonl_to_writer(branch, type_names, table_keys, writer)
.await?;
writer.flush()?;
Ok(())
}
}
}
pub(crate) async fn list_graphs(&self) -> Result<GraphListResponse> {
match self {
GraphClient::Remote {
http,
base_url,
token,
} => {
remote_json(
http,
Method::GET,
remote_url(base_url, &["graphs"], &[])?,
None,
token.as_deref(),
)
.await
}
GraphClient::Embedded { .. } => bail!(
"`omnigraph graphs list` requires a remote multi-graph server \
(--server <url>). To enumerate the graphs in a cluster, run \
`omnigraph cluster status --config <dir>`."
),
}
}
}