use std::path::Path;
use std::sync::{Arc, RwLock};
use anyhow::Result;
use kglite::api::cypher;
use kglite::api::{
compute_description, compute_schema, load_file, ConnectionDetail, CypherDetail, Embedder,
FluentDetail, KnowledgeGraph, Value,
};
use mcp_methods::server::McpServer;
use serde::{Deserialize, Serialize};
const NO_GRAPH: &str =
"No active graph. Pass --graph X.kgl, or activate one via repo_management('org/repo').";
#[derive(Clone, Default)]
pub struct GraphState {
inner: Arc<RwLock<Option<ActiveGraph>>>,
}
struct ActiveGraph {
kg: KnowledgeGraph,
source_path: Option<std::path::PathBuf>,
}
impl GraphState {
pub fn new() -> Self {
Self::default()
}
pub fn load_kgl(&self, path: &Path) -> Result<()> {
let dir = load_file(&path.to_string_lossy())
.map_err(|e| anyhow::anyhow!("kglite::load_file failed: {}", e))?;
let kg = KnowledgeGraph::from_arc(dir);
*self.inner.write().unwrap() = Some(ActiveGraph {
kg,
source_path: Some(path.to_path_buf()),
});
Ok(())
}
pub fn build_code_tree(&self, dir: &Path) -> Result<()> {
let dir_arc = kglite::api::build_code_tree(dir, false, true, None, None)
.map_err(|e| anyhow::anyhow!("kglite::build_code_tree failed: {}", e))?;
let kg = KnowledgeGraph::from_arc(dir_arc);
*self.inner.write().unwrap() = Some(ActiveGraph {
kg,
source_path: None,
});
Ok(())
}
pub fn bind_embedder(&self, embedder: Arc<dyn Embedder>) -> Result<()> {
let mut guard = self.inner.write().unwrap();
let Some(active) = guard.as_mut() else {
tracing::warn!("embedder loaded before any graph is active; binding deferred");
return Ok(());
};
active.kg.set_embedder_native(embedder);
Ok(())
}
pub fn schema(&self) -> Option<(u64, u64)> {
let guard = self.inner.read().unwrap();
let active = guard.as_ref()?;
let overview = compute_schema(active.kg.dir());
Some((overview.node_count as u64, overview.edge_count as u64))
}
pub fn has_node_type(&self, node_type: &str) -> bool {
let guard = self.inner.read().unwrap();
guard
.as_ref()
.map(|active| active.kg.dir().has_node_type(node_type))
.unwrap_or(false)
}
pub fn has_property(&self, node_type: &str, prop_name: &str) -> bool {
let guard = self.inner.read().unwrap();
guard
.as_ref()
.map(|active| {
active
.kg
.dir()
.get_node_type_metadata(node_type)
.map(|meta| meta.contains_key(prop_name))
.unwrap_or(false)
})
.unwrap_or(false)
}
fn with_active<F>(&self, f: F) -> String
where
F: FnOnce(&ActiveGraph) -> String,
{
let guard = self.inner.read().unwrap();
match guard.as_ref() {
Some(active) => f(active),
None => NO_GRAPH.to_string(),
}
}
pub fn with_kg<F, T>(&self, f: F) -> Option<T>
where
F: FnOnce(&kglite::api::KnowledgeGraph) -> T,
{
let guard = self.inner.read().unwrap();
guard.as_ref().map(|active| f(&active.kg))
}
pub fn source_lookup(
&self,
qualified_name: &str,
node_type: Option<&str>,
) -> Result<crate::code_source::SourceLookup, String> {
let guard = self.inner.read().unwrap();
let Some(active) = guard.as_ref() else {
return Err(NO_GRAPH.to_string());
};
match active.kg.source_location(qualified_name, node_type) {
kglite::api::SourceLookup::Found(loc) => {
let file_path = loc.file_path.ok_or_else(|| {
format!("graph.source({qualified_name:?}) returned no file_path")
})?;
let line_number = loc.line_number.unwrap_or(1).max(1) as usize;
let end_line = loc.end_line.unwrap_or(loc.line_number.unwrap_or(1)).max(1) as usize;
Ok(crate::code_source::SourceLookup {
file_path,
line_number,
end_line,
})
}
kglite::api::SourceLookup::Ambiguous(matches) => Err(format!(
"ambiguous qualified_name {qualified_name:?}; matches: {matches:?}. \
Pass `node_type` to narrow."
)),
kglite::api::SourceLookup::NotFound => Err(format!(
"graph.source({qualified_name:?}) returned no match. \
Try passing `node_type` or using a different qualified name."
)),
}
}
pub fn run_cypher_template(
&self,
template: &str,
args: &serde_json::Map<String, serde_json::Value>,
csv_http: Option<&crate::csv_http::CsvHttpConfig>,
) -> String {
let guard = self.inner.read().unwrap();
let Some(active) = guard.as_ref() else {
return NO_GRAPH.to_string();
};
let mut params = std::collections::HashMap::new();
for (k, v) in args {
params.insert(k.clone(), json_to_value(v));
}
match run_cypher_inner(&active.kg, template, params, csv_http) {
Ok(body) => body,
Err(e) => format!("Cypher error: {e}"),
}
}
}
fn json_to_value(v: &serde_json::Value) -> Value {
match v {
serde_json::Value::Null => Value::Null,
serde_json::Value::Bool(b) => Value::Boolean(*b),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Value::Int64(i)
} else if let Some(f) = n.as_f64() {
Value::Float64(f)
} else {
Value::Null
}
}
serde_json::Value::String(s) => Value::String(s.clone()),
other => Value::String(other.to_string()),
}
}
fn run_cypher_inner(
kg: &KnowledgeGraph,
query: &str,
params: std::collections::HashMap<String, Value>,
csv_http: Option<&crate::csv_http::CsvHttpConfig>,
) -> Result<String, String> {
let pre_parsed = kglite::api::cypher::parse_cypher(query).map_err(|e| e.to_string())?;
if kglite::api::cypher::is_mutation_query(&pre_parsed) {
return Err(
"mutation Cypher (CREATE/SET/DELETE/REMOVE/MERGE) is not allowed through \
the MCP cypher_query tool. Use the kglite CLI for graph edits."
.to_string(),
);
}
let output_csv = pre_parsed.output_format == kglite::api::cypher::OutputFormat::Csv;
let embedder = kg.embedder().cloned();
let opts = kglite::api::session::ExecuteOptions {
params: ¶ms,
deadline: None,
max_rows: None,
lazy_eligible: false,
disabled_passes: None,
embedder,
};
let outcome = kglite::api::session::execute_read(kg.dir(), query, &opts)
.map_err(|e| format!("Cypher execution error: {e}"))?;
let result = outcome.result;
if output_csv {
let csv = result.to_csv();
if let Some(cfg) = csv_http {
match crate::csv_http::write_csv(cfg, &csv) {
Ok(name) => {
let url = cfg.url_for(&name);
let row_count = count_csv_rows(&csv);
Ok(format!(
"FORMAT CSV: {row_count} row(s) written to {url}\n\
Fetch with: curl {url}"
))
}
Err(e) => {
tracing::warn!(error = %e, "csv_http write_csv failed; falling back to inline");
Ok(csv)
}
}
} else {
Ok(csv)
}
} else {
Ok(format_cypher_inline(&result))
}
}
fn format_cypher_inline(result: &cypher::CypherResult) -> String {
let len = result.rows.len();
if len == 0 {
return "No results.".to_string();
}
let header = if len > 15 {
format!("{len} row(s) (showing first 15):\n")
} else {
format!("{len} row(s):\n")
};
let mut out = header;
out.push_str(&result.columns.join("\t"));
out.push('\n');
for row in result.rows.iter().take(15) {
for (i, val) in row.iter().enumerate() {
if i > 0 {
out.push('\t');
}
push_value_repr(&mut out, val);
}
out.push('\n');
}
out
}
fn count_csv_rows(csv: &str) -> usize {
let line_count = csv.lines().count();
line_count.saturating_sub(1)
}
fn push_value_repr(out: &mut String, val: &Value) {
use std::fmt::Write;
match val {
Value::Null => out.push_str("null"),
Value::String(s) => {
let _ = write!(out, "{s:?}");
}
Value::Int64(n) => {
let _ = write!(out, "{n}");
}
Value::Float64(f) => {
let _ = write!(out, "{f}");
}
Value::Boolean(b) => out.push_str(if *b { "true" } else { "false" }),
Value::UniqueId(u) => {
let _ = write!(out, "{u}");
}
Value::DateTime(d) => out.push_str(&d.format("%Y-%m-%d").to_string()),
Value::Point { lat, lon } => {
let _ = write!(out, "POINT({lon} {lat})");
}
Value::Duration {
months,
days,
seconds,
} => {
let _ = write!(out, "duration(M={months}, D={days}, S={seconds})");
}
Value::NodeRef(idx) => {
let _ = write!(out, "node[{idx}]");
}
Value::List(_)
| Value::Map(_)
| Value::Node(_)
| Value::Relationship(_)
| Value::Path(_) => {
let _ = write!(
out,
"{}",
serde_json::to_string(val).unwrap_or_else(|_| "?".to_string())
);
}
}
}
#[derive(Debug, Default, Deserialize, Serialize, schemars::JsonSchema)]
struct CypherArgs {
pub query: String,
}
#[derive(Debug, Default, Deserialize, Serialize, schemars::JsonSchema)]
struct OverviewArgs {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub types: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub connections: Option<serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cypher: Option<serde_json::Value>,
}
#[derive(Debug, Default, Deserialize, Serialize, schemars::JsonSchema)]
struct SaveGraphArgs {}
#[derive(Clone, Debug, Default)]
pub struct Builtins {
pub save_graph: bool,
pub temp_cleanup_on_overview: bool,
pub temp_dir: Option<std::path::PathBuf>,
}
pub fn register(
server: &mut McpServer,
state: GraphState,
builtins: Builtins,
csv_http: Option<Arc<crate::csv_http::CsvHttpConfig>>,
) {
let s = state.clone();
let csv = csv_http.clone();
let cypher_desc: &'static str = if csv.is_some() {
"Run a Cypher query against the active knowledge graph. Returns up to 15 rows \
inline; append FORMAT CSV to export results — large CSVs are written to the \
csv_http_server directory and returned as a fetch URL."
} else {
"Run a Cypher query against the active knowledge graph. Returns up to 15 rows \
inline; append FORMAT CSV to export full results to a CSV string."
};
server.register_typed_tool::<CypherArgs, _>("cypher_query", cypher_desc, move |args| {
let csv = csv.clone();
s.with_active(|g| run_cypher_tool(g, &args.query, csv.as_deref()))
});
let s = state.clone();
let cleanup_temp = builtins.temp_cleanup_on_overview;
let temp_dir = builtins.temp_dir.clone();
server.register_typed_tool::<OverviewArgs, _>(
"graph_overview",
"Inspect the active graph's schema. With no args returns the inventory; pass \
types=[...] / connections=true|[...] / cypher=true|[...] for drill-down.",
move |args| {
if cleanup_temp
&& args.types.is_none()
&& args.connections.is_none()
&& args.cypher.is_none()
{
if let Some(dir) = temp_dir.as_deref() {
wipe_temp_dir(dir);
}
}
s.with_active(|g| run_overview(g, &args))
},
);
if builtins.save_graph {
let s = state;
server.register_typed_tool::<SaveGraphArgs, _>(
"save_graph",
"Persist the active graph to its source .kgl file (single-graph mode only).",
move |_| s.with_active(run_save),
);
}
}
fn wipe_temp_dir(dir: &std::path::Path) {
if !dir.is_dir() {
tracing::debug!(dir = %dir.display(), "temp_cleanup: directory does not exist; nothing to wipe");
return;
}
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(e) => {
tracing::debug!(error = %e, dir = %dir.display(), "temp_cleanup: read_dir failed");
return;
}
};
let mut wiped = 0usize;
for entry in entries.flatten() {
let path = entry.path();
let res = if path.is_dir() {
std::fs::remove_dir_all(&path)
} else {
std::fs::remove_file(&path)
};
match res {
Ok(()) => wiped += 1,
Err(e) => {
tracing::debug!(path = %path.display(), error = %e, "temp_cleanup: remove failed");
}
}
}
if wiped > 0 {
tracing::info!(count = wiped, dir = %dir.display(), "temp_cleanup: wiped entries");
}
}
fn run_cypher_tool(
graph: &ActiveGraph,
query: &str,
csv_http: Option<&crate::csv_http::CsvHttpConfig>,
) -> String {
match run_cypher_inner(&graph.kg, query, std::collections::HashMap::new(), csv_http) {
Ok(s) => s,
Err(e) => format!("Cypher error: {e}"),
}
}
fn run_overview(graph: &ActiveGraph, args: &OverviewArgs) -> String {
let conn = parse_connection_detail(args.connections.as_ref());
let cy = parse_cypher_detail(args.cypher.as_ref());
let fluent = FluentDetail::Off;
match compute_description(
graph.kg.dir(),
args.types.as_deref(),
&conn,
&cy,
&fluent,
None,
None,
None,
) {
Ok(s) => s,
Err(e) => format!("graph_overview error: {e}"),
}
}
fn parse_connection_detail(v: Option<&serde_json::Value>) -> ConnectionDetail {
use serde_json::Value;
match v {
None | Some(Value::Null) => ConnectionDetail::Off,
Some(Value::Bool(false)) => ConnectionDetail::Off,
Some(Value::Bool(true)) => ConnectionDetail::Overview,
Some(Value::Array(items)) => {
let names: Vec<String> = items
.iter()
.filter_map(|i| i.as_str().map(String::from))
.collect();
if names.is_empty() {
ConnectionDetail::Overview
} else {
ConnectionDetail::Topics(names)
}
}
Some(_) => ConnectionDetail::Overview,
}
}
fn parse_cypher_detail(v: Option<&serde_json::Value>) -> CypherDetail {
use serde_json::Value;
match v {
None | Some(Value::Null) => CypherDetail::Off,
Some(Value::Bool(false)) => CypherDetail::Off,
Some(Value::Bool(true)) => CypherDetail::Overview,
Some(Value::Array(items)) => {
let names: Vec<String> = items
.iter()
.filter_map(|i| i.as_str().map(String::from))
.collect();
if names.is_empty() {
CypherDetail::Overview
} else {
CypherDetail::Topics(names)
}
}
Some(_) => CypherDetail::Overview,
}
}
fn run_save(graph: &ActiveGraph) -> String {
let Some(path) = graph.source_path.as_ref() else {
return "save_graph requires --graph mode (no source path bound).".to_string();
};
let path_str = path.to_string_lossy().into_owned();
let mut dir_arc = graph.kg.dir().clone();
match kglite::api::save_graph(&mut dir_arc, &path_str) {
Ok(()) => {
let dir = std::sync::Arc::make_mut(&mut dir_arc);
let overview = compute_schema(dir);
format!(
"Saved {path_str} ({} nodes, {} edges).",
overview.node_count, overview.edge_count
)
}
Err(e) => format!("save_graph error: {e}"),
}
}