use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use car_ir::ActionProposal;
use car_memgine::MemgineEngine;
use tokio::sync::Mutex;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::error_codes::{
INTERNAL as E_INTERNAL, INVALID_PARAMS as E_INVALID_PARAMS,
INVALID_REQUEST as E_INVALID_REQUEST, METHOD_NOT_FOUND as E_METHOD_NOT_FOUND,
};
use crate::schemas::{cached_prompt_schemas, cached_tool_schemas};
use crate::{PROTOCOL_VERSION, SERVER_NAME};
#[derive(Debug, Deserialize)]
pub struct Request {
pub jsonrpc: String,
#[serde(default)]
pub id: Option<Value>,
pub method: String,
#[serde(default)]
pub params: Value,
}
#[derive(Debug, Serialize)]
pub struct Response {
pub jsonrpc: &'static str,
pub id: Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ErrorObj>,
}
#[derive(Debug, Serialize)]
pub struct ErrorObj {
pub code: i32,
pub message: String,
}
pub fn ok(id: Value, result: Value) -> Response {
Response {
jsonrpc: "2.0",
id,
result: Some(result),
error: None,
}
}
pub fn err(id: Value, code: i32, message: impl Into<String>) -> Response {
Response {
jsonrpc: "2.0",
id,
result: None,
error: Some(ErrorObj {
code,
message: message.into(),
}),
}
}
#[derive(Debug)]
pub enum ToolError {
InvalidParams(String),
Internal(String),
UnknownTool(String),
}
impl ToolError {
pub fn code(&self) -> i32 {
match self {
ToolError::InvalidParams(_) => E_INVALID_PARAMS,
ToolError::Internal(_) => E_INTERNAL,
ToolError::UnknownTool(_) => E_METHOD_NOT_FOUND,
}
}
pub fn message(&self) -> &str {
match self {
ToolError::InvalidParams(m) | ToolError::Internal(m) | ToolError::UnknownTool(m) => m,
}
}
}
fn missing(field: &str) -> ToolError {
ToolError::InvalidParams(format!("missing {}", field))
}
pub struct Server {
memgine: Arc<Mutex<MemgineEngine>>,
next_fact_id: AtomicU64,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
pub fn new() -> Self {
Self::with_memgine(Arc::new(Mutex::new(MemgineEngine::new(None))))
}
pub fn with_memgine(memgine: Arc<Mutex<MemgineEngine>>) -> Self {
Self {
memgine,
next_fact_id: AtomicU64::new(0),
}
}
pub async fn handle(&self, req: Request) -> Option<Response> {
let id = match req.id.clone() {
Some(id) => id,
None => {
tracing::debug!(method = %req.method, "notification received");
return None;
}
};
if req.jsonrpc != "2.0" {
return Some(err(id, E_INVALID_REQUEST, "jsonrpc must be \"2.0\""));
}
match req.method.as_str() {
"initialize" => Some(ok(
id,
json!({
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {
"tools": {},
"resources": { "subscribe": false, "listChanged": false },
"prompts": { "listChanged": false },
},
"serverInfo": { "name": SERVER_NAME, "version": env!("CARGO_PKG_VERSION") },
}),
)),
"ping" => Some(ok(id, json!({}))),
"tools/list" => Some(ok(id, json!({ "tools": cached_tool_schemas() }))),
"tools/call" => Some(match self.tools_call(&req.params).await {
Ok(v) => ok(id, v),
Err(e) => err(id, e.code(), e.message()),
}),
"resources/list" => Some(match self.resources_list().await {
Ok(v) => ok(id, v),
Err(e) => err(id, e.code(), e.message()),
}),
"resources/read" => Some(match self.resources_read(&req.params).await {
Ok(v) => ok(id, v),
Err(e) => err(id, e.code(), e.message()),
}),
"prompts/list" => Some(ok(id, json!({ "prompts": cached_prompt_schemas() }))),
"prompts/get" => Some(match self.prompts_get(&req.params).await {
Ok(v) => ok(id, v),
Err(e) => err(id, e.code(), e.message()),
}),
other => Some(err(
id,
E_METHOD_NOT_FOUND,
format!("method not found: {}", other),
)),
}
}
async fn tools_call(&self, params: &Value) -> Result<Value, ToolError> {
let name = params
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("name"))?;
let args = params.get("arguments").cloned().unwrap_or(Value::Null);
let text = match name {
"memory_add_fact" => self.tool_add_fact(&args).await?,
"memory_query" => self.tool_query(&args).await?,
"skill_find" => self.tool_skill_find(&args).await?,
"skill_ingest" => self.tool_skill_ingest(&args).await?,
"skill_list" => self.tool_skill_list(&args).await?,
"verify" => self.tool_verify(&args)?,
other => return Err(ToolError::UnknownTool(format!("unknown tool: {}", other))),
};
Ok(json!({
"content": [{ "type": "text", "text": text }],
"isError": false,
}))
}
async fn tool_add_fact(&self, args: &Value) -> Result<String, ToolError> {
let subject = args
.get("subject")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("subject"))?;
let body = args
.get("body")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("body"))?;
let kind = args
.get("kind")
.and_then(|v| v.as_str())
.unwrap_or("pattern");
let mut engine = self.memgine.lock().await;
let fid = format!("mcp-{}", self.next_fact_id.fetch_add(1, Ordering::Relaxed));
engine.ingest_fact(
&fid,
subject,
body,
"user",
"mcp",
chrono::Utc::now(),
"global",
None,
vec![],
kind == "constraint",
);
Ok(format!(
"fact ingested id={} total={}",
fid,
engine.valid_fact_count()
))
}
async fn tool_query(&self, args: &Value) -> Result<String, ToolError> {
let query = args
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("query"))?;
let k = args.get("k").and_then(|v| v.as_u64()).unwrap_or(5) as usize;
let engine = self.memgine.lock().await;
let seeds = engine.graph.find_seeds(query, 5);
let hits = if !seeds.is_empty() {
engine.graph.retrieve(&seeds, 3, k, 0.6, 0.05)
} else {
vec![]
};
let out: Vec<Value> = hits
.iter()
.filter_map(|hit| {
let node = engine.graph.inner.node_weight(hit.node_ix)?;
Some(json!({
"subject": node.key,
"body": node.value,
"activation": hit.activation,
}))
})
.collect();
serde_json::to_string(&out).map_err(|e| ToolError::Internal(e.to_string()))
}
async fn tool_skill_find(&self, args: &Value) -> Result<String, ToolError> {
let persona = args.get("persona").and_then(|v| v.as_str()).unwrap_or("");
let url = args.get("url").and_then(|v| v.as_str()).unwrap_or("");
let task = args
.get("task")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("task"))?;
let k = args.get("k").and_then(|v| v.as_u64()).unwrap_or(3) as usize;
let engine = self.memgine.lock().await;
let results = engine.find_skill(persona, url, task, k);
let out: Vec<Value> = results
.iter()
.map(|(meta, score)| json!({ "skill": meta, "score": score }))
.collect();
serde_json::to_string(&out).map_err(|e| ToolError::Internal(e.to_string()))
}
async fn tool_skill_ingest(&self, args: &Value) -> Result<String, ToolError> {
let name = args
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("name"))?;
let code = args
.get("code")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("code"))?;
let platform = args
.get("platform")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let persona = args.get("persona").and_then(|v| v.as_str()).unwrap_or("");
let url_pattern = args
.get("url_pattern")
.and_then(|v| v.as_str())
.unwrap_or("");
let description = args
.get("description")
.and_then(|v| v.as_str())
.unwrap_or("");
let supersedes = args.get("supersedes").and_then(|v| v.as_str());
let keywords: Vec<String> = args
.get("task_keywords")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let trigger = car_memgine::SkillTrigger {
persona: persona.into(),
url_pattern: url_pattern.into(),
task_keywords: keywords,
};
let mut engine = self.memgine.lock().await;
engine.ingest_skill(
name,
code,
platform,
trigger,
description,
supersedes,
vec![],
vec![],
);
Ok(format!("skill ingested: {}", name))
}
async fn tool_skill_list(&self, args: &Value) -> Result<String, ToolError> {
let domain = args.get("domain").and_then(|v| v.as_str());
let engine = self.memgine.lock().await;
let skills: Vec<Value> = engine
.graph
.inner
.node_indices()
.filter_map(|nix| {
let node = engine.graph.inner.node_weight(nix)?;
if node.kind != car_memgine::MemKind::Skill {
return None;
}
let meta = car_memgine::SkillMeta::from_node(node)?;
if let Some(d) = domain {
match &meta.scope {
car_memgine::SkillScope::Global => {}
car_memgine::SkillScope::Domain(sd) if sd == d => {}
_ => return None,
}
}
serde_json::to_value(&meta).ok()
})
.collect();
serde_json::to_string(&skills).map_err(|e| ToolError::Internal(e.to_string()))
}
async fn resources_list(&self) -> Result<Value, ToolError> {
let engine = self.memgine.lock().await;
let mut resources: Vec<Value> = Vec::new();
for nix in engine.graph.inner.node_indices() {
let Some(node) = engine.graph.inner.node_weight(nix) else {
continue;
};
match node.kind {
car_memgine::MemKind::Fact => {
let Some(fid) = node.fact_id.as_deref() else {
continue;
};
resources.push(json!({
"uri": format!("car://memory/fact/{}", fid),
"name": node.key,
"description": if node.is_constraint { "CAR constraint" } else { "CAR fact" },
"mimeType": "text/plain",
}));
}
car_memgine::MemKind::Skill => {
resources.push(json!({
"uri": format!("car://memory/skill/{}", node.key),
"name": node.key,
"description": "CAR skill",
"mimeType": "application/json",
}));
}
_ => {}
}
}
Ok(json!({ "resources": resources }))
}
async fn resources_read(&self, params: &Value) -> Result<Value, ToolError> {
let uri = params
.get("uri")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("uri"))?;
let engine = self.memgine.lock().await;
if let Some(fid) = uri.strip_prefix("car://memory/fact/") {
for nix in engine.graph.inner.node_indices() {
let Some(node) = engine.graph.inner.node_weight(nix) else {
continue;
};
if node.kind != car_memgine::MemKind::Fact {
continue;
}
if node.fact_id.as_deref() == Some(fid) {
return Ok(json!({
"contents": [{
"uri": uri,
"mimeType": "text/plain",
"text": format!("{}\n\n{}", node.key, node.value),
}],
}));
}
}
return Err(ToolError::InvalidParams(format!("fact not found: {}", fid)));
}
if let Some(name) = uri.strip_prefix("car://memory/skill/") {
for nix in engine.graph.inner.node_indices() {
let Some(node) = engine.graph.inner.node_weight(nix) else {
continue;
};
if node.kind != car_memgine::MemKind::Skill {
continue;
}
if node.key == name {
let meta = car_memgine::SkillMeta::from_node(node);
let body = serde_json::to_string_pretty(&meta)
.unwrap_or_else(|_| node.value.clone());
return Ok(json!({
"contents": [{
"uri": uri,
"mimeType": "application/json",
"text": body,
}],
}));
}
}
return Err(ToolError::InvalidParams(format!("skill not found: {}", name)));
}
Err(ToolError::InvalidParams(format!(
"unsupported uri scheme: {}",
uri
)))
}
async fn prompts_get(&self, params: &Value) -> Result<Value, ToolError> {
let name = params
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("name"))?;
if name != "car_context" {
return Err(ToolError::InvalidParams(format!("unknown prompt: {}", name)));
}
let args = params.get("arguments").cloned().unwrap_or(Value::Null);
let query = args
.get("query")
.and_then(|v| v.as_str())
.ok_or_else(|| missing("arguments.query"))?;
let mode = args.get("mode").and_then(|v| v.as_str()).unwrap_or("full");
let mut engine = self.memgine.lock().await;
let text = match mode {
"fast" => engine.build_context_fast(query),
"full" | "" => engine.build_context(query),
other => return Err(ToolError::InvalidParams(format!("unknown mode: {}", other))),
};
Ok(json!({
"description": "CAR four-layer context (identity → constraints → facts → conversation → environment → known-unknowns) assembled for the query.",
"messages": [{
"role": "user",
"content": { "type": "text", "text": text },
}],
}))
}
fn tool_verify(&self, args: &Value) -> Result<String, ToolError> {
let proposal_val = args.get("proposal").ok_or_else(|| missing("proposal"))?;
let proposal: ActionProposal = serde_json::from_value(proposal_val.clone())
.map_err(|e| ToolError::InvalidParams(format!("proposal: {}", e)))?;
let max_actions = args
.get("max_actions")
.and_then(|v| v.as_u64())
.unwrap_or(30) as usize;
let result = car_verify::verify(&proposal, None, None, max_actions);
serde_json::to_string(&json!({
"valid": result.valid,
"issues": result.issues.iter().map(|i| json!({
"action_id": i.action_id,
"severity": i.severity,
"message": i.message,
})).collect::<Vec<_>>(),
"simulated_state": result.simulated_state,
}))
.map_err(|e| ToolError::Internal(e.to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_request(method: &str, params: Value, id: u64) -> Request {
Request {
jsonrpc: "2.0".to_string(),
id: Some(json!(id)),
method: method.to_string(),
params,
}
}
#[tokio::test]
async fn initialize_returns_protocol_version() {
let server = Server::new();
let resp = server
.handle(make_request("initialize", json!({}), 1))
.await
.expect("response");
let result = resp.result.unwrap();
assert_eq!(result["protocolVersion"], PROTOCOL_VERSION);
assert_eq!(result["serverInfo"]["name"], SERVER_NAME);
}
#[tokio::test]
async fn notification_returns_none() {
let server = Server::new();
let req = Request {
jsonrpc: "2.0".to_string(),
id: None,
method: "notifications/initialized".to_string(),
params: Value::Null,
};
assert!(server.handle(req).await.is_none());
}
#[tokio::test]
async fn unknown_method_returns_method_not_found_error() {
let server = Server::new();
let resp = server
.handle(make_request("bogus/method", json!({}), 1))
.await
.expect("response");
let err = resp.error.unwrap();
assert_eq!(err.code, E_METHOD_NOT_FOUND);
}
#[tokio::test]
async fn tools_list_returns_six_tools() {
let server = Server::new();
let resp = server
.handle(make_request("tools/list", json!({}), 1))
.await
.expect("response");
let tools = resp.result.unwrap()["tools"].as_array().unwrap().len();
assert_eq!(tools, 6);
}
#[tokio::test]
async fn add_fact_then_query_round_trips() {
let server = Server::new();
let _add = server
.handle(make_request(
"tools/call",
json!({
"name": "memory_add_fact",
"arguments": { "subject": "color", "body": "the sky is blue" }
}),
1,
))
.await
.unwrap();
let q = server
.handle(make_request(
"tools/call",
json!({
"name": "memory_query",
"arguments": { "query": "color", "k": 5 }
}),
2,
))
.await
.unwrap();
let text = q.result.unwrap()["content"][0]["text"]
.as_str()
.unwrap()
.to_string();
assert!(text.contains("color") || text.contains("sky"));
}
#[tokio::test]
async fn invalid_jsonrpc_version_rejected() {
let server = Server::new();
let req = Request {
jsonrpc: "1.0".to_string(),
id: Some(json!(1)),
method: "ping".to_string(),
params: Value::Null,
};
let resp = server.handle(req).await.unwrap();
let err = resp.error.unwrap();
assert_eq!(err.code, E_INVALID_REQUEST);
}
#[tokio::test]
async fn prompts_get_unknown_returns_invalid_params() {
let server = Server::new();
let resp = server
.handle(make_request(
"prompts/get",
json!({ "name": "does_not_exist", "arguments": { "query": "x" } }),
1,
))
.await
.unwrap();
let err = resp.error.unwrap();
assert_eq!(err.code, E_INVALID_PARAMS);
}
}