use std::sync::Arc;
use rmcp::handler::server::ServerHandler;
use rmcp::model::{
CallToolRequestParams as CallToolRequestParam, CallToolResult, Content, Implementation,
InitializeRequestParams, InitializeResult, ListToolsResult,
PaginatedRequestParams as PaginatedRequestParam, ProtocolVersion, ServerCapabilities,
ServerInfo, Tool,
};
use rmcp::service::{RequestContext, RoleServer};
use rmcp::{ErrorData as McpError, ServiceExt};
use serde::{Deserialize, Serialize};
use solo_core::{Confidence, DocumentId, EncodingContext, Episode, MemoryId, Tier};
use solo_storage::{TenantHandle, TenantRegistry};
use std::str::FromStr;
#[derive(Clone)]
pub struct SoloMcpServer {
inner: Arc<Inner>,
}
struct Inner {
#[allow(dead_code)]
registry: Arc<TenantRegistry>,
tenant: Arc<TenantHandle>,
user_aliases: Vec<String>,
audit_principal: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InitializeDecision {
Allow,
PopulateSamplingSteward,
RejectMissingSamplingCapability,
}
pub fn initialize_decision(
llm_settings: &Option<solo_storage::LlmSettings>,
peer_sampling_supported: bool,
) -> InitializeDecision {
match llm_settings {
Some(settings) if settings.requires_mcp_peer() => {
if peer_sampling_supported {
InitializeDecision::PopulateSamplingSteward
} else {
InitializeDecision::RejectMissingSamplingCapability
}
}
_ => InitializeDecision::Allow,
}
}
pub fn sampling_capability_missing_error_message() -> String {
[
"LLM backend `mcp_sampling` requires a connected MCP client that",
"advertises the `sampling` capability at initialize. Either the",
"current MCP client does not support sampling, or this Solo",
"process is running in daemon-only mode (no peer to call back).",
"",
"Pick one of:",
"",
" # Anthropic (hosted):",
" [llm]",
" mode = \"anthropic\"",
" api_key_env = \"ANTHROPIC_API_KEY\"",
" model = \"claude-sonnet-4-6\"",
"",
" # OpenAI (hosted):",
" [llm]",
" mode = \"openai\"",
" api_key_env = \"OPENAI_API_KEY\"",
" model = \"gpt-5o\"",
"",
" # Ollama (local daemon):",
" [llm]",
" mode = \"ollama\"",
" base_url = \"http://localhost:11434\"",
" model = \"qwen3-coder:30b\"",
"",
" # None (cluster-only; abstractions skipped):",
" [llm]",
" mode = \"none\"",
"",
"See docs/releases/v0.9.0.md \u{00a7}LLM-backend selection for details.",
]
.join("\n")
}
pub const ENV_MCP_PRINCIPAL_TOKEN: &str = "SOLO_MCP_PRINCIPAL_TOKEN";
pub fn resolve_mcp_principal(header_value: Option<&str>) -> Option<String> {
if let Some(h) = header_value {
if let Some(token) = h.strip_prefix("Bearer ") {
let trimmed = token.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
}
match std::env::var(ENV_MCP_PRINCIPAL_TOKEN) {
Ok(v) => {
let trimmed = v.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
}
Err(_) => None,
}
}
impl SoloMcpServer {
pub fn new_for_tenant(
registry: Arc<TenantRegistry>,
tenant: Arc<TenantHandle>,
user_aliases: Vec<String>,
) -> Self {
let principal = resolve_mcp_principal(None);
Self::new_for_tenant_with_principal(registry, tenant, user_aliases, principal)
}
pub fn new_for_tenant_with_principal(
registry: Arc<TenantRegistry>,
tenant: Arc<TenantHandle>,
user_aliases: Vec<String>,
audit_principal: Option<String>,
) -> Self {
Self {
inner: Arc::new(Inner {
registry,
tenant,
user_aliases,
audit_principal,
}),
}
}
}
pub async fn serve_stdio(server: SoloMcpServer) -> anyhow::Result<()> {
use rmcp::transport::io::stdio;
let (stdin, stdout) = stdio();
let running = server.serve((stdin, stdout)).await?;
running.waiting().await?;
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RememberArgs {
pub content: String,
#[serde(default)]
pub source_type: Option<String>,
#[serde(default)]
pub source_id: Option<String>,
#[serde(default)]
pub salience: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RememberItem {
pub content: String,
#[serde(default)]
pub source_type: Option<String>,
#[serde(default)]
pub source_id: Option<String>,
#[serde(default)]
pub salience: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RememberBatchArgs {
pub items: Vec<RememberItem>,
}
fn validate_salience(salience: Option<f32>) -> std::result::Result<(), McpError> {
if let Some(s) = salience {
if !s.is_finite() || !(0.0..=1.0).contains(&s) {
return Err(McpError::invalid_params(
format!("salience must be in [0.0, 1.0]; got {s}"),
None,
));
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallArgs {
pub query: String,
#[serde(default = "default_limit")]
pub limit: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryContextArgs {
pub query: String,
#[serde(default)]
pub subject: Option<String>,
#[serde(default)]
pub window_days: Option<i64>,
#[serde(default = "default_limit")]
pub limit: usize,
}
fn default_limit() -> usize {
5
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgetArgs {
pub memory_id: String,
#[serde(default = "default_forget_reason")]
pub reason: String,
}
fn default_forget_reason() -> String {
"user-initiated via MCP".into()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InspectArgs {
pub memory_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateArgs {
pub memory_id: String,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThemesArgs {
#[serde(default)]
pub window_days: Option<i64>,
#[serde(default = "default_limit")]
pub limit: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FactsAboutArgs {
pub subject: String,
#[serde(default)]
pub predicate: Option<String>,
#[serde(default)]
pub since_ms: Option<i64>,
#[serde(default)]
pub until_ms: Option<i64>,
#[serde(default)]
pub include_as_object: bool,
#[serde(default = "default_limit")]
pub limit: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EntitiesArgs {
pub query: String,
#[serde(default = "default_limit")]
pub limit: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContradictionsArgs {
#[serde(default = "default_limit")]
pub limit: usize,
}
fn default_contradiction_status() -> String {
"resolved".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContradictionResolveArgs {
pub a_id: String,
pub b_id: String,
pub kind: String,
#[serde(default = "default_contradiction_status")]
pub status: String,
#[serde(default)]
pub resolution_note: Option<String>,
#[serde(default)]
pub winning_triple_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InspectClusterArgs {
pub cluster_id: String,
#[serde(default)]
pub full_content: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IngestDocumentArgs {
pub path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SearchDocsArgs {
pub query: String,
#[serde(default = "default_search_docs_limit")]
pub limit: usize,
}
fn default_search_docs_limit() -> usize {
5
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InspectDocumentArgs {
pub doc_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListDocumentsArgs {
#[serde(default = "default_list_documents_limit")]
pub limit: usize,
#[serde(default)]
pub offset: usize,
#[serde(default)]
pub include_forgotten: bool,
}
fn default_list_documents_limit() -> usize {
20
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ForgetDocumentArgs {
pub doc_id: String,
}
impl ServerHandler for SoloMcpServer {
fn get_info(&self) -> ServerInfo {
let capabilities = ServerCapabilities::builder().enable_tools().build();
let mut info = ServerInfo::default();
info.protocol_version = ProtocolVersion::default();
info.capabilities = capabilities;
info.server_info =
Implementation::new("solo".to_string(), env!("CARGO_PKG_VERSION").to_string());
info.instructions = Some(
"Solo gives you persistent memory across conversations \
with this user — what they've told you before, the \
people and projects in their life, and where their \
stated beliefs have shifted, plus a library of \
documents the user has ingested (notes, runbooks, \
PDFs). Reach for these tools whenever the user \
references something from earlier (\"like I \
mentioned\", \"the project I'm working on\", \"my \
friend Alex\", \"the notes I uploaded last week\") \
or asks a question that hinges on personal context \
or document content you don't have in the current \
chat. \
\n\nBest first call for agent work: memory_context \
(one bounded bundle containing recall, themes, \
optional facts, and contradictions). Use the \
narrower tools below when you need more detail or \
a specific operation. \
\n\nTools to write or look up specific moments: \
memory_remember (save something worth keeping), \
memory_update (correct one active saved item), \
memory_recall (search past conversations by topic), \
memory_inspect (show one saved item by id), \
memory_forget (delete one saved item). \
\n\nTools for the bigger picture (populated as the \
user uses Solo over time): memory_themes (recent \
topics they've been thinking about), \
memory_facts_about (what you know about a person, \
project, or place — \"what do you know about \
Alex?\"), memory_entities (discover graph entity \
ids by name), memory_contradictions (places where the \
user has said two things that disagree — surface \
these before answering), memory_contradiction_resolve \
(mark a contradiction resolved or reopened), \
memory_inspect_cluster \
(the raw conversations behind one summary). \
\n\nTools for the user's documents: \
memory_ingest_document (read a file from disk and \
add it to Solo's library), memory_search_docs \
(search across ingested documents by topic — use \
when the user asks about something they wrote down \
or saved as a file), memory_inspect_document (show \
one document's metadata plus a preview of its \
chunks), memory_list_documents (browse documents \
by recency), memory_forget_document (drop a \
document from the library)."
.into(),
);
info
}
async fn initialize(
&self,
request: InitializeRequestParams,
context: RequestContext<RoleServer>,
) -> std::result::Result<InitializeResult, McpError> {
if context.peer.peer_info().is_none() {
context.peer.set_peer_info(request.clone());
}
let llm_settings = self.inner.tenant.config().llm.as_ref().cloned();
let peer_sampling_supported = request.capabilities.sampling.is_some();
match initialize_decision(&llm_settings, peer_sampling_supported) {
InitializeDecision::Allow => {}
InitializeDecision::PopulateSamplingSteward => {
self.populate_sampling_steward(&context).await;
}
InitializeDecision::RejectMissingSamplingCapability => {
return Err(McpError::invalid_request(
sampling_capability_missing_error_message(),
None,
));
}
}
Ok(self.get_info())
}
async fn list_tools(
&self,
_request: Option<PaginatedRequestParam>,
_context: RequestContext<RoleServer>,
) -> std::result::Result<ListToolsResult, McpError> {
Ok(ListToolsResult {
tools: build_tools(),
next_cursor: None,
..Default::default()
})
}
async fn call_tool(
&self,
request: CallToolRequestParam,
_context: RequestContext<RoleServer>,
) -> std::result::Result<CallToolResult, McpError> {
let CallToolRequestParam {
name, arguments, ..
} = request;
let args_value = serde_json::Value::Object(arguments.unwrap_or_default());
self.dispatch_tool(&name, args_value, None).await
}
}
impl SoloMcpServer {
async fn populate_sampling_steward(&self, context: &RequestContext<RoleServer>) {
let tenant_cfg = self.inner.tenant.config();
let steward_config = solo_steward::StewardConfig::from_settings_then_env(
tenant_cfg.steward.cluster_min_size,
tenant_cfg.steward.cluster_cosine_threshold,
)
.unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"v0.11.1: StewardConfig::from_settings_then_env failed at MCP \
initialize; falling back to defaults"
);
solo_steward::StewardConfig::default()
});
let sampling_config = self.inner.tenant.config().sampling.clone();
let peer = context.peer.clone();
let write_handle = self.inner.tenant.write().clone();
let steward = crate::llm::build_sampling_steward(
peer,
write_handle,
self.inner.audit_principal.clone(),
steward_config,
sampling_config.clone(),
);
let slot = self.inner.tenant.steward_slot();
let mut guard = slot.write().await;
*guard = Some(steward);
tracing::info!(
tenant = %self.inner.tenant.tenant_id(),
coalesce_window_ms = sampling_config.coalesce_window_ms,
coalesce_max_requests = sampling_config.coalesce_max_requests,
"v0.9.0 P5: MCP-sampling Steward attached to tenant.steward_slot \
(PeerSamplingClient → SamplingCoordinator → SamplingLlmClient)"
);
}
pub async fn dispatch_tool(
&self,
name: &str,
args_value: serde_json::Value,
progress: Option<crate::mcp_progress::ProgressReporter>,
) -> std::result::Result<CallToolResult, McpError> {
match name {
"memory_remember" => {
let args: RememberArgs = parse_args(&args_value)?;
self.handle_remember(args).await
}
"memory_remember_batch" => {
let args: RememberBatchArgs = parse_args(&args_value)?;
self.handle_remember_batch(args, progress).await
}
"memory_recall" => {
let args: RecallArgs = parse_args(&args_value)?;
self.handle_recall(args).await
}
"memory_context" => {
let args: MemoryContextArgs = parse_args(&args_value)?;
self.handle_memory_context(args).await
}
"memory_forget" => {
let args: ForgetArgs = parse_args(&args_value)?;
self.handle_forget(args).await
}
"memory_inspect" => {
let args: InspectArgs = parse_args(&args_value)?;
self.handle_inspect(args).await
}
"memory_update" => {
let args: UpdateArgs = parse_args(&args_value)?;
self.handle_update(args).await
}
"memory_themes" => {
let args: ThemesArgs = parse_args(&args_value)?;
self.handle_themes(args).await
}
"memory_facts_about" => {
let args: FactsAboutArgs = parse_args(&args_value)?;
self.handle_facts_about(args).await
}
"memory_entities" => {
let args: EntitiesArgs = parse_args(&args_value)?;
self.handle_entities(args).await
}
"memory_contradictions" => {
let args: ContradictionsArgs = parse_args(&args_value)?;
self.handle_contradictions(args).await
}
"memory_contradiction_resolve" => {
let args: ContradictionResolveArgs = parse_args(&args_value)?;
self.handle_contradiction_resolve(args).await
}
"memory_inspect_cluster" => {
let args: InspectClusterArgs = parse_args(&args_value)?;
self.handle_inspect_cluster(args).await
}
"memory_ingest_document" => {
let args: IngestDocumentArgs = parse_args(&args_value)?;
self.handle_ingest_document(args, progress).await
}
"memory_search_docs" => {
let args: SearchDocsArgs = parse_args(&args_value)?;
self.handle_search_docs(args, progress).await
}
"memory_inspect_document" => {
let args: InspectDocumentArgs = parse_args(&args_value)?;
self.handle_inspect_document(args).await
}
"memory_list_documents" => {
let args: ListDocumentsArgs = parse_args(&args_value)?;
self.handle_list_documents(args).await
}
"memory_forget_document" => {
let args: ForgetDocumentArgs = parse_args(&args_value)?;
self.handle_forget_document(args).await
}
other => Err(McpError::invalid_params(
format!("unknown tool `{other}`"),
None,
)),
}
}
pub fn dispatch_list_tools(&self) -> Vec<Tool> {
build_tools()
}
}
fn parse_args<T: serde::de::DeserializeOwned>(
v: &serde_json::Value,
) -> std::result::Result<T, McpError> {
serde_json::from_value(v.clone())
.map_err(|e| McpError::invalid_params(format!("invalid tool arguments: {e}"), None))
}
fn solo_to_mcp(e: solo_core::Error) -> McpError {
use solo_core::Error;
match e {
Error::NotFound(msg) => McpError::invalid_params(msg, None),
Error::InvalidInput(msg) => McpError::invalid_params(msg, None),
Error::Conflict(msg) => McpError::invalid_params(msg, None),
other => McpError::internal_error(other.to_string(), None),
}
}
fn build_tools() -> Vec<Tool> {
vec![
Tool::new(
"memory_remember",
"Save something the user has told you — a fact, a \
preference, a name, a date, a context — so you can pick \
it up next conversation. Use whenever the user mentions \
something they'd reasonably expect you to recall later \
(\"I just started at Quotient\", \"my partner is Maya\"). \
Returns the saved item's id.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The text to remember.",
},
"source_type": {
"type": "string",
"description": "Optional source-type tag (default: \"user_message\"). See docs/mcp/source-types.md for convention values.",
},
"source_id": {
"type": "string",
"description": "Optional upstream id for traceability.",
},
"salience": {
"type": "number",
"description": "Optional salience in [0.0, 1.0]; defaults to 0.5. Higher values bias toward recall ranking + retention. v0.9.2+.",
"minimum": 0.0,
"maximum": 1.0,
},
},
"required": ["content"],
})),
),
Tool::new(
"memory_remember_batch",
"Save several items atomically in one transaction — either \
every item lands or none does. Use this when you have a \
collection of related episodes from one logical step (a \
conversation turn, a tool-output bundle, an ingest batch) \
and partial success would leave the user's memory in a \
confusing half-state. Each item carries the same fields as \
memory_remember (content + optional source_type, source_id, \
salience). Returns an ordered array of memory_ids matching \
the input items. v0.9.2+.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"items": {
"type": "array",
"description": format!(
"Items to remember atomically. Max {} per call.",
solo_storage::MAX_REMEMBER_BATCH_SIZE,
),
"minItems": 1,
"maxItems": solo_storage::MAX_REMEMBER_BATCH_SIZE,
"items": {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "The text to remember.",
},
"source_type": {
"type": "string",
"description": "Optional source-type tag (default: \"user_message\"). See docs/mcp/source-types.md.",
},
"source_id": {
"type": "string",
"description": "Optional upstream id for traceability.",
},
"salience": {
"type": "number",
"description": "Optional salience in [0.0, 1.0]; defaults to 0.5.",
"minimum": 0.0,
"maximum": 1.0,
},
},
"required": ["content"],
},
},
},
"required": ["items"],
})),
),
Tool::new(
"memory_recall",
"Search past conversations with this user by topic or \
phrase. Returns up to `limit` of the closest matches, \
best match first. Use when the user references \
something they said before (\"that book I told you \
about\", \"the bug we were debugging last week\"). \
Skips items the user has deleted.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query text.",
},
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": ["query"],
})),
),
Tool::new(
"memory_context",
"Build a compact working-memory bundle for an agent turn. \
Use this near the start of a substantial answer or task \
when remembered context may matter. It combines raw \
episodic recall, recent themes, optional structured facts \
about `subject`, and known contradictions so clients can \
ground answers without making four separate calls.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural-language query for episodic recall.",
},
"subject": {
"type": "string",
"description": "Optional subject for structured facts. When present, facts also match object-position references.",
},
"window_days": {
"type": "integer",
"description": "Optional recency window in days for themes. Omit for unfiltered.",
"minimum": 1,
},
"limit": {
"type": "integer",
"description": "Per-section maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": ["query"],
})),
),
Tool::new(
"memory_forget",
"Delete one saved item by id. Use when the user asks you \
to forget something specific (\"forget that I said \
X\"). The item stops appearing in future recalls. \
Reversible only via backups.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "MemoryId to forget (UUID v7).",
},
"reason": {
"type": "string",
"description": "Optional free-form reason (logged, not yet persisted).",
},
},
"required": ["memory_id"],
})),
),
Tool::new(
"memory_inspect",
"Show the full record for one saved item — when it was \
saved, where it came from, and the full text. Use after \
memory_recall when you want the complete content of a \
specific hit (recall results may be truncated).",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "MemoryId to inspect (UUID v7).",
},
},
"required": ["memory_id"],
})),
),
Tool::new(
"memory_update",
"Correct one active saved memory and refresh its embedding \
and search index entry. Use when the user says a remembered \
episode is wrong or outdated and provides the corrected \
wording. Returns the updated memory id, rowid, content, and \
timestamp.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"memory_id": {
"type": "string",
"description": "MemoryId to update (UUID v7).",
},
"content": {
"type": "string",
"description": "Replacement content for the active memory.",
"minLength": 1,
},
},
"required": ["memory_id", "content"],
})),
),
Tool::new(
"memory_themes",
"Recent topics the user has been thinking about. Use to \
orient yourself at the start of a conversation, or when \
the user asks \"what have I been up to\" / \"what was I \
working on last week\". Pass `window_days` to scope \
(e.g. 7 for last week); omit for all-time.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"window_days": {
"type": "integer",
"description": "Optional time window in days. Omit for unfiltered.",
"minimum": 1,
},
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": [],
})),
),
Tool::new(
"memory_facts_about",
"Look up what you remember about a person, project, or \
topic — names, dates, preferences, relationships. Use \
when the user asks \"what do you know about Alex?\", \
\"when did I start at Quotient?\", \"who is Maya?\", or \
whenever you need grounded facts about someone or \
something before answering. Subject is required (the \
person/place/thing you're asking about); narrow further \
with `predicate` (\"works_at\", \"lives_in\") or a date \
range. Set `include_as_object=true` to also surface \
facts where the subject appears on the receiving side of \
a relationship (e.g. \"Sam pushes back on PRs about \
Maya\" surfaces under facts_about(subject=\"Maya\", \
include_as_object=true)). (Backed by \
subject-predicate-object triples distilled from past \
conversations.) Clients should set a 30s timeout on this \
call; if exceeded, retry once or fall back to \
`memory_recall`.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"subject": {
"type": "string",
"description": "Subject id to query (e.g. 'Sam').",
},
"predicate": {
"type": "string",
"description": "Optional predicate filter (e.g. 'works_at').",
},
"since_ms": {
"type": "integer",
"description": "Optional valid_from_ms lower bound (epoch ms).",
},
"until_ms": {
"type": "integer",
"description": "Optional valid_to_ms upper bound (epoch ms). NULL upper bounds (still-valid facts) pass through.",
},
"include_as_object": {
"type": "boolean",
"description": "If true, also match facts where `subject` appears as the object (e.g. 'Sam pushes back on PRs about Maya' surfaces under subject='Maya'). Default false.",
"default": false,
},
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": ["subject"],
})),
),
Tool::new(
"memory_entities",
"Discover entity ids from the structured-fact graph. Use \
before memory_facts_about when you are not sure how a \
person, project, or topic is keyed in memory, or when the \
user gives a partial name. Returns entity ids with fact \
counts and common predicates.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Partial or exact entity id to search for.",
"minLength": 1,
},
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": ["query"],
})),
),
Tool::new(
"memory_contradictions",
"Find places where the user's stated beliefs or facts \
disagree across conversations — flag disagreements \
before answering. Use whenever you're about to rely on \
a remembered fact that could have changed (jobs, \
relationships, preferences, opinions); a disagreement \
here means the user has told you both X and not-X over \
time and you should ask which is current instead of \
guessing. Each result shows both conflicting statements \
with the topic.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": [],
})),
),
Tool::new(
"memory_contradiction_resolve",
"Mark one flagged contradiction as resolved, unresolved, \
or reopened. Use after the user clarifies which side is \
current. Pass the a_id, b_id, and kind from \
memory_contradictions; status defaults to resolved.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"a_id": {
"type": "string",
"description": "First contradiction id from memory_contradictions.",
},
"b_id": {
"type": "string",
"description": "Second contradiction id from memory_contradictions.",
},
"kind": {
"type": "string",
"description": "Contradiction kind from memory_contradictions.",
},
"status": {
"type": "string",
"enum": ["unresolved", "resolved", "reopened"],
"default": "resolved",
"description": "New lifecycle status.",
},
"resolution_note": {
"type": "string",
"description": "Optional human-readable clarification.",
},
"winning_triple_id": {
"type": "string",
"description": "Optional triple id to treat as the current/correct side.",
},
},
"required": ["a_id", "b_id", "kind"],
})),
),
Tool::new(
"memory_inspect_cluster",
"Show the raw conversations behind one summary. Returns \
the one-line topic (the LLM-generated summary) and the \
source conversations the topic was built from. Use \
after memory_themes when the user asks \"show me the \
raw context behind this\" or \"why does Solo think \
that about cluster Y\". Source items are truncated to \
200 chars unless `full_content` is set.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"cluster_id": {
"type": "string",
"description": "Cluster id to inspect (from memory_themes hits).",
},
"full_content": {
"type": "boolean",
"description": "If true, episode content is returned verbatim. Default false (truncate to 200 chars + ellipsis).",
},
},
"required": ["cluster_id"],
})),
),
Tool::new(
"memory_ingest_document",
"Read a file from disk and add it to the user's document \
library so it becomes searchable alongside past \
conversations. Use when the user asks you to remember a \
whole file (\"add my notes/runbook.md\", \"ingest this \
PDF\"). The file is split into ~500-token chunks and \
each chunk is embedded; chunks then surface through \
memory_search_docs. Returns the new document id, chunk \
count, and a `deduped` flag (true if the same content \
was already ingested under another id).",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Server-side absolute path to the file to ingest. The file must be readable by the Solo process.",
},
},
"required": ["path"],
})),
),
Tool::new(
"memory_search_docs",
"Search across the user's ingested documents by topic or \
phrase. Returns up to `limit` matching chunks, best \
match first, each with the parent document's title + \
source path so you can cite where the answer came from. \
Use when the user asks a question that hinges on \
material they've added as a file (\"what does my \
runbook say about backups?\", \"find the section in the \
notes about the new policy\"). Forgotten documents are \
skipped.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query text.",
},
"limit": {
"type": "integer",
"description": "Maximum results (default 5).",
"minimum": 1,
"maximum": 100,
},
},
"required": ["query"],
})),
),
Tool::new(
"memory_inspect_document",
"Show one document's metadata plus a preview of every \
chunk it was split into. Use after memory_search_docs \
when the user wants the bigger picture for one hit \
(\"show me the whole document this came from\"), or \
after memory_list_documents to drill into one entry. \
Each chunk preview is truncated to 200 chars.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"doc_id": {
"type": "string",
"description": "Document id to inspect (UUID v7).",
},
},
"required": ["doc_id"],
})),
),
Tool::new(
"memory_list_documents",
"List the user's ingested documents, newest first. Use \
when the user asks \"what documents have I added?\" or \
\"show me my files\". Returns a paginated index — pass \
`offset` to page further back. Forgotten documents are \
hidden by default; set `include_forgotten=true` to see \
them too.",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Maximum results per page (default 20).",
"minimum": 1,
"maximum": 100,
},
"offset": {
"type": "integer",
"description": "Number of rows to skip (for paging). Default 0.",
"minimum": 0,
},
"include_forgotten": {
"type": "boolean",
"description": "If true, also include documents the user has forgotten. Default false.",
},
},
})),
),
Tool::new(
"memory_forget_document",
"Drop one document from the user's library by id. Use \
when the user asks you to forget a specific file \
(\"forget my old runbook\"). The document's chunks stop \
appearing in memory_search_docs and the vectors are \
tombstoned in the index. The chunk rows themselves are \
kept for forensic value (a future restore command can \
undo this).",
json_schema_object(serde_json::json!({
"type": "object",
"properties": {
"doc_id": {
"type": "string",
"description": "Document id to forget (UUID v7).",
},
},
"required": ["doc_id"],
})),
),
]
}
fn json_schema_object(value: serde_json::Value) -> serde_json::Map<String, serde_json::Value> {
match value {
serde_json::Value::Object(map) => map,
_ => panic!("json_schema_object: input must be an object"),
}
}
pub fn tool_names() -> Vec<&'static str> {
vec![
"memory_remember",
"memory_remember_batch",
"memory_recall",
"memory_context",
"memory_forget",
"memory_inspect",
"memory_update",
"memory_themes",
"memory_facts_about",
"memory_entities",
"memory_contradictions",
"memory_contradiction_resolve",
"memory_inspect_cluster",
"memory_ingest_document",
"memory_search_docs",
"memory_inspect_document",
"memory_list_documents",
"memory_forget_document",
]
}
impl SoloMcpServer {
async fn handle_remember(
&self,
args: RememberArgs,
) -> std::result::Result<CallToolResult, McpError> {
let content = args.content.trim_end().to_string();
if content.is_empty() {
return Err(McpError::invalid_params(
"memory_remember: content must not be empty".to_string(),
None,
));
}
validate_salience(args.salience)?;
let embedding: solo_core::Embedding = self
.inner
.tenant
.embedder()
.embed(&content)
.await
.map_err(solo_to_mcp)?;
let episode = Episode {
memory_id: MemoryId::new(),
ts_ms: chrono::Utc::now().timestamp_millis(),
source_type: args.source_type.unwrap_or_else(|| "user_message".into()),
source_id: args.source_id,
content,
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).expect("0.9 is in [0.0, 1.0]"),
strength: 0.5,
salience: args.salience.unwrap_or(0.5),
tier: Tier::Hot,
};
let mid = self
.inner
.tenant
.write()
.remember_as(self.inner.audit_principal.clone(), episode, embedding)
.await
.map_err(solo_to_mcp)?;
Ok(CallToolResult::success(vec![Content::text(format!(
"remembered {mid}"
))]))
}
async fn handle_remember_batch(
&self,
args: RememberBatchArgs,
progress: Option<crate::mcp_progress::ProgressReporter>,
) -> std::result::Result<CallToolResult, McpError> {
if args.items.is_empty() {
return Err(McpError::invalid_params(
"memory_remember_batch: items must not be empty".to_string(),
None,
));
}
if args.items.len() > solo_storage::MAX_REMEMBER_BATCH_SIZE {
return Err(McpError::invalid_params(
format!(
"memory_remember_batch: {} items exceeds MAX_REMEMBER_BATCH_SIZE = {}",
args.items.len(),
solo_storage::MAX_REMEMBER_BATCH_SIZE,
),
None,
));
}
for (i, item) in args.items.iter().enumerate() {
if item.content.trim_end().is_empty() {
return Err(McpError::invalid_params(
format!("memory_remember_batch: items[{i}].content must not be empty"),
None,
));
}
validate_salience(item.salience).map_err(|e| {
McpError::invalid_params(
format!("memory_remember_batch: items[{i}].{}", e.message),
None,
)
})?;
}
let total = args.items.len() as u64;
let progress_active = progress.is_some()
&& args.items.len() > crate::mcp_progress::MCP_REMEMBER_BATCH_PROGRESS_ITEM_THRESHOLD;
let progress_reporter = if progress_active {
progress.as_ref()
} else {
None
};
let embedder = self.inner.tenant.embedder();
let now_ms = chrono::Utc::now().timestamp_millis();
let mut pairs: Vec<(Episode, solo_core::Embedding)> = Vec::with_capacity(args.items.len());
for (i, item) in args.items.into_iter().enumerate() {
let content = item.content.trim_end().to_string();
let embedding = embedder.embed(&content).await.map_err(solo_to_mcp)?;
let episode = Episode {
memory_id: MemoryId::new(),
ts_ms: now_ms,
source_type: item.source_type.unwrap_or_else(|| "user_message".into()),
source_id: item.source_id,
content,
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).expect("0.9 is in [0.0, 1.0]"),
strength: 0.5,
salience: item.salience.unwrap_or(0.5),
tier: Tier::Hot,
};
pairs.push((episode, embedding));
let done = (i + 1) as u64;
if (i + 1) % crate::mcp_progress::MCP_REMEMBER_BATCH_PROGRESS_EMIT_EVERY == 0 {
crate::mcp_progress::report_if_some(
progress_reporter,
done,
Some(total),
Some("embedding"),
);
}
}
crate::mcp_progress::report_if_some(
progress_reporter,
total,
Some(total),
Some("embedded"),
);
let memory_ids = self
.inner
.tenant
.write()
.remember_batch_as(self.inner.audit_principal.clone(), pairs)
.await
.map_err(solo_to_mcp)?;
crate::mcp_progress::report_if_some(
progress_reporter,
total,
Some(total),
Some("inserted"),
);
let ids_as_strings: Vec<String> = memory_ids.iter().map(|m| m.to_string()).collect();
let body = serde_json::to_string(&ids_as_strings)
.map_err(|e| McpError::internal_error(format!("serialize batch reply: {e}"), None))?;
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_recall(
&self,
args: RecallArgs,
) -> std::result::Result<CallToolResult, McpError> {
let result = solo_query::run_recall(
self.inner.tenant.as_ref(),
self.inner.audit_principal.clone(),
&args.query,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&result.hits).unwrap_or_else(|_| "[]".to_string());
let mut contents = vec![Content::text(body)];
if result.hits.is_empty() {
contents.push(Content::text(format!(
"(index has {} vectors)",
result.index_len
)));
}
Ok(CallToolResult::success(contents))
}
async fn handle_memory_context(
&self,
args: MemoryContextArgs,
) -> std::result::Result<CallToolResult, McpError> {
let result = solo_query::memory_context(
self.inner.tenant.as_ref(),
self.inner.audit_principal.clone(),
&args.query,
args.subject.as_deref(),
&self.inner.user_aliases,
args.window_days,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&result).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_forget(
&self,
args: ForgetArgs,
) -> std::result::Result<CallToolResult, McpError> {
let mid = MemoryId::from_str(&args.memory_id)
.map_err(|e| McpError::invalid_params(format!("invalid memory_id: {e}"), None))?;
self.inner
.tenant
.write()
.forget_as(self.inner.audit_principal.clone(), mid, args.reason)
.await
.map_err(solo_to_mcp)?;
Ok(CallToolResult::success(vec![Content::text(format!(
"forgotten {mid}"
))]))
}
async fn handle_inspect(
&self,
args: InspectArgs,
) -> std::result::Result<CallToolResult, McpError> {
let mid = MemoryId::from_str(&args.memory_id)
.map_err(|e| McpError::invalid_params(format!("invalid memory_id: {e}"), None))?;
let row = solo_query::inspect_one(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
mid,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&row).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_update(
&self,
args: UpdateArgs,
) -> std::result::Result<CallToolResult, McpError> {
let mid = MemoryId::from_str(&args.memory_id)
.map_err(|e| McpError::invalid_params(format!("invalid memory_id: {e}"), None))?;
if args.content.trim().is_empty() {
return Err(McpError::invalid_params(
"memory_update: content must not be empty".to_string(),
None,
));
}
let result = solo_query::memory_update(
self.inner.tenant.as_ref(),
self.inner.audit_principal.clone(),
mid,
&args.content,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&result).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_themes(
&self,
args: ThemesArgs,
) -> std::result::Result<CallToolResult, McpError> {
let hits = solo_query::themes(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
args.window_days,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&hits).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_facts_about(
&self,
args: FactsAboutArgs,
) -> std::result::Result<CallToolResult, McpError> {
if args.subject.trim().is_empty() {
return Err(McpError::invalid_params(
"memory_facts_about: subject must not be empty".to_string(),
None,
));
}
let hits = solo_query::facts_about(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
&args.subject,
&self.inner.user_aliases,
args.include_as_object,
args.predicate.as_deref(),
args.since_ms,
args.until_ms,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&hits).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_entities(
&self,
args: EntitiesArgs,
) -> std::result::Result<CallToolResult, McpError> {
if args.query.trim().is_empty() {
return Err(McpError::invalid_params(
"memory_entities: query must not be empty".to_string(),
None,
));
}
let hits = solo_query::entities(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
&args.query,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&hits).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_contradictions(
&self,
args: ContradictionsArgs,
) -> std::result::Result<CallToolResult, McpError> {
let hits = solo_query::contradictions(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
args.limit,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&hits).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_contradiction_resolve(
&self,
args: ContradictionResolveArgs,
) -> std::result::Result<CallToolResult, McpError> {
if args.a_id.trim().is_empty() || args.b_id.trim().is_empty() || args.kind.trim().is_empty()
{
return Err(McpError::invalid_params(
"memory_contradiction_resolve: a_id, b_id, and kind must not be empty".to_string(),
None,
));
}
let result = solo_query::resolve_contradiction(
self.inner.tenant.write(),
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
&args.a_id,
&args.b_id,
&args.kind,
&args.status,
args.resolution_note.as_deref(),
args.winning_triple_id.as_deref(),
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&result).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_inspect_cluster(
&self,
args: InspectClusterArgs,
) -> std::result::Result<CallToolResult, McpError> {
if args.cluster_id.trim().is_empty() {
return Err(McpError::invalid_params(
"memory_inspect_cluster: cluster_id must not be empty".to_string(),
None,
));
}
let record = solo_query::inspect_cluster(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
&args.cluster_id,
args.full_content,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&record).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_ingest_document(
&self,
args: IngestDocumentArgs,
progress: Option<crate::mcp_progress::ProgressReporter>,
) -> std::result::Result<CallToolResult, McpError> {
if args.path.trim().is_empty() {
return Err(McpError::invalid_params(
"memory_ingest_document: path must not be empty".to_string(),
None,
));
}
let path = std::path::PathBuf::from(args.path);
let chunk_config = solo_storage::document::ChunkConfig::default();
const INGEST_TOTAL_PHASES: u64 = 4;
crate::mcp_progress::report_if_some(
progress.as_ref(),
1,
Some(INGEST_TOTAL_PHASES),
Some("parsed"),
);
crate::mcp_progress::report_if_some(
progress.as_ref(),
2,
Some(INGEST_TOTAL_PHASES),
Some("chunked"),
);
let report = self
.inner
.tenant
.write()
.ingest_document_as(self.inner.audit_principal.clone(), path, chunk_config)
.await
.map_err(solo_to_mcp)?;
crate::mcp_progress::report_if_some(
progress.as_ref(),
3,
Some(INGEST_TOTAL_PHASES),
Some("embedded"),
);
crate::mcp_progress::report_if_some(
progress.as_ref(),
INGEST_TOTAL_PHASES,
Some(INGEST_TOTAL_PHASES),
Some(&format!("inserted {} chunks", report.chunks_persisted)),
);
let body = serde_json::to_string_pretty(&report).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_search_docs(
&self,
args: SearchDocsArgs,
progress: Option<crate::mcp_progress::ProgressReporter>,
) -> std::result::Result<CallToolResult, McpError> {
let top_k = args.limit as u32;
let progress_active = progress.is_some()
&& top_k > crate::mcp_progress::MCP_SEARCH_DOCS_PROGRESS_TOP_K_THRESHOLD;
let progress_reporter = if progress_active {
progress.as_ref()
} else {
None
};
const SEARCH_TOTAL_PHASES: u64 = 3;
crate::mcp_progress::report_if_some(
progress_reporter,
1,
Some(SEARCH_TOTAL_PHASES),
Some("hnsw_lookup"),
);
let hits = solo_query::run_doc_search(
self.inner.tenant.as_ref(),
self.inner.audit_principal.clone(),
&args.query,
args.limit,
)
.await
.map_err(solo_to_mcp)?;
crate::mcp_progress::report_if_some(
progress_reporter,
2,
Some(SEARCH_TOTAL_PHASES),
Some("reranked"),
);
crate::mcp_progress::report_if_some(
progress_reporter,
SEARCH_TOTAL_PHASES,
Some(SEARCH_TOTAL_PHASES),
Some(&format!("returning {} hits", hits.len())),
);
let body = serde_json::to_string_pretty(&hits).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_inspect_document(
&self,
args: InspectDocumentArgs,
) -> std::result::Result<CallToolResult, McpError> {
let doc_id = DocumentId::from_str(&args.doc_id)
.map_err(|e| McpError::invalid_params(format!("invalid doc_id: {e}"), None))?;
let result_opt = solo_query::inspect_document(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
&doc_id,
)
.await
.map_err(solo_to_mcp)?;
match result_opt {
Some(record) => {
let body = serde_json::to_string_pretty(&record).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
None => Err(McpError::invalid_params(
format!("document {doc_id} not found"),
None,
)),
}
}
async fn handle_list_documents(
&self,
args: ListDocumentsArgs,
) -> std::result::Result<CallToolResult, McpError> {
let rows = solo_query::list_documents(
self.inner.tenant.read(),
self.inner.tenant.audit(),
self.inner.audit_principal.clone(),
args.limit,
args.offset,
args.include_forgotten,
)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&rows).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
async fn handle_forget_document(
&self,
args: ForgetDocumentArgs,
) -> std::result::Result<CallToolResult, McpError> {
let doc_id = DocumentId::from_str(&args.doc_id)
.map_err(|e| McpError::invalid_params(format!("invalid doc_id: {e}"), None))?;
let report = self
.inner
.tenant
.write()
.forget_document_as(self.inner.audit_principal.clone(), doc_id)
.await
.map_err(solo_to_mcp)?;
let body = serde_json::to_string_pretty(&report).unwrap_or_else(|_| String::new());
Ok(CallToolResult::success(vec![Content::text(body)]))
}
}
#[cfg(test)]
mod dispatch_tests {
use super::*;
use serde_json::json;
use solo_core::VectorIndex;
use solo_storage::test_support::StubVectorIndex;
use solo_storage::{
EmbedderConfig, IdentityConfig, KeyMaterial, ReaderPool, SoloConfig, StubEmbedder,
TenantHandle, TenantRegistry, WriterActor, WriterSpawn,
};
use std::sync::Arc as StdArc;
fn fake_config(dim: u32) -> SoloConfig {
SoloConfig {
schema_version: 1,
salt_hex: "00000000000000000000000000000000".to_string(),
embedder: EmbedderConfig {
name: "stub".to_string(),
version: "v1".to_string(),
dim,
dtype: "f32".to_string(),
},
identity: IdentityConfig::default(),
documents: solo_storage::DocumentConfig::default(),
auth: None,
audit: solo_storage::AuditSettings::default(),
redaction: solo_storage::RedactionConfig::default(),
llm: None,
triples: solo_storage::TriplesConfig::default(),
sampling: solo_storage::SamplingConfig::default(),
steward: solo_storage::StewardSettings::default(),
}
}
struct Harness {
server: SoloMcpServer,
_tmp: tempfile::TempDir,
db_path: std::path::PathBuf,
write_handle_extra: Option<solo_storage::WriteHandle>,
join: Option<std::thread::JoinHandle<()>>,
}
impl Harness {
fn new(runtime: &tokio::runtime::Runtime) -> Self {
use solo_storage::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
let tmp = tempfile::TempDir::new().unwrap();
let dim = 16usize;
let hnsw: StdArc<dyn VectorIndex + Send + Sync> =
StdArc::new(StubVectorIndex::new(dim));
let embedder: StdArc<dyn solo_core::Embedder> =
StdArc::new(StubEmbedder::new("stub", "v1", dim));
let conn = solo_storage::test_support::open_test_db_at(&tmp.path().join("test.db"));
let embedder_id = get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.expect("register stub embedder");
let WriterSpawn { handle, join } =
WriterActor::spawn_full(conn, hnsw.clone(), tmp.path().to_path_buf(), embedder_id);
let path = tmp.path().join("test.db");
let pool: ReaderPool =
runtime.block_on(async { ReaderPool::new(&path, None, hnsw.clone()).unwrap() });
let tenant_id = solo_core::TenantId::default_tenant();
let tenant_handle = StdArc::new(TenantHandle::from_parts_for_tests(
tenant_id.clone(),
fake_config(dim as u32),
path.clone(),
tmp.path().to_path_buf(),
embedder_id,
hnsw,
embedder.clone(),
handle.clone(),
std::thread::spawn(|| {}),
pool,
));
let key = KeyMaterial::from_bytes_for_tests([0u8; 32]);
let registry = StdArc::new(TenantRegistry::for_tests_with_single_tenant(
tmp.path().to_path_buf(),
key,
embedder,
tenant_handle.clone(),
));
let server = SoloMcpServer::new_for_tenant(registry, tenant_handle, Vec::new());
Harness {
server,
_tmp: tmp,
db_path: path,
write_handle_extra: Some(handle),
join: Some(join),
}
}
fn open_db(&self) -> rusqlite::Connection {
solo_storage::test_support::open_test_db_at(&self.db_path)
}
fn shutdown(mut self, runtime: &tokio::runtime::Runtime) {
let join = self.join.take();
let extra = self.write_handle_extra.take();
runtime.block_on(async move {
drop(extra);
drop(self.server);
drop(self._tmp);
if let Some(join) = join {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let _ = tx.send(join.join());
});
tokio::task::spawn_blocking(move || {
rx.recv_timeout(std::time::Duration::from_secs(5))
})
.await
.expect("blocking task")
.expect("writer thread did not exit within 5s")
.expect("writer thread panicked");
}
});
}
}
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
}
fn first_text(r: &rmcp::model::CallToolResult) -> String {
let first = r.content.first().expect("at least one content item");
let v = serde_json::to_value(first).expect("content serialises");
v.get("text")
.and_then(|t| t.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| format!("{v}"))
}
fn seed_episode(conn: &rusqlite::Connection, content: &str) -> (MemoryId, i64) {
let memory_id = MemoryId::new();
conn.execute(
"INSERT INTO episodes
(memory_id, ts_ms, source_type, content, confidence, strength,
salience, tier, status, created_at_ms, updated_at_ms)
VALUES (?1, 0, 'test', ?2, 0.9, 0.5, 0.5, 'hot', 'active', 0, 0)",
rusqlite::params![memory_id.to_string(), content],
)
.expect("seed episode");
(memory_id, conn.last_insert_rowid())
}
fn seed_triple_row(
conn: &rusqlite::Connection,
triple_id: &str,
subject: &str,
predicate: &str,
object: &str,
source_episode_rowid: Option<i64>,
) {
conn.execute(
"INSERT INTO triples
(triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
status, created_at_ms, updated_at_ms, source_episode_id)
VALUES (?1, ?2, ?3, ?4, 'literal', 0, NULL, 0.9, '{}',
'active', 0, 0, ?5)",
rusqlite::params![triple_id, subject, predicate, object, source_episode_rowid],
)
.expect("seed triple");
}
fn seed_contradiction_row(conn: &rusqlite::Connection, a_id: &str, b_id: &str, kind: &str) {
conn.execute(
"INSERT INTO contradictions
(a_memory_id, b_memory_id, kind, explanation, detected_at_ms,
status, resolved_at_ms, resolution_note, winning_triple_id)
VALUES (?1, ?2, ?3, 'test contradiction', 0,
'unresolved', NULL, NULL, NULL)",
rusqlite::params![a_id, b_id, kind],
)
.expect("seed contradiction");
}
#[test]
fn tools_list_returns_eighteen_canonical_tools() {
let runtime = rt();
let h = Harness::new(&runtime);
let tools = h.server.dispatch_list_tools();
let names: Vec<&str> = tools.iter().map(|t| t.name.as_ref()).collect();
assert_eq!(
names,
vec![
"memory_remember",
"memory_remember_batch",
"memory_recall",
"memory_context",
"memory_forget",
"memory_inspect",
"memory_update",
"memory_themes",
"memory_facts_about",
"memory_entities",
"memory_contradictions",
"memory_contradiction_resolve",
"memory_inspect_cluster",
"memory_ingest_document",
"memory_search_docs",
"memory_inspect_document",
"memory_list_documents",
"memory_forget_document",
]
);
for t in &tools {
let desc = t.description.as_deref().unwrap_or("");
assert!(!desc.is_empty(), "{} description empty", t.name);
let _schema = t.schema_as_json_value();
}
h.shutdown(&runtime);
}
#[test]
fn themes_returns_json_array_on_empty_db() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool("memory_themes", json!({}), None)
.await
.expect("themes succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert!(v.is_array(), "expected array, got: {text}");
assert_eq!(v.as_array().unwrap().len(), 0);
});
h.shutdown(&runtime);
}
#[test]
fn themes_passes_through_window_and_limit_args() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_themes",
json!({ "window_days": 7, "limit": 20 }),
None,
)
.await
.expect("themes with args succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert!(v.is_array());
});
h.shutdown(&runtime);
}
#[test]
fn facts_about_rejects_empty_subject() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_facts_about", json!({ "subject": " " }), None)
.await
.expect_err("empty subject must error");
let s = format!("{err:?}");
assert!(
s.to_lowercase().contains("subject") || s.to_lowercase().contains("invalid"),
"got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn facts_about_returns_array_for_unknown_subject() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_facts_about",
json!({ "subject": "NobodyKnowsThisSubject" }),
None,
)
.await
.expect("facts_about with unknown subject succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert_eq!(v.as_array().unwrap().len(), 0);
});
h.shutdown(&runtime);
}
#[test]
fn facts_about_accepts_include_as_object_arg() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_facts_about",
json!({ "subject": "Maya", "include_as_object": true }),
None,
)
.await
.expect("dispatch with include_as_object=true succeeds");
let v: serde_json::Value =
serde_json::from_str(&first_text(&r)).expect("parses as json");
assert_eq!(v.as_array().unwrap().len(), 0);
let r = h
.server
.dispatch_tool("memory_facts_about", json!({ "subject": "Maya" }), None)
.await
.expect("dispatch without include_as_object succeeds (default false)");
let v: serde_json::Value =
serde_json::from_str(&first_text(&r)).expect("parses as json");
assert_eq!(v.as_array().unwrap().len(), 0);
});
h.shutdown(&runtime);
}
#[test]
fn contradictions_returns_json_array_on_empty_db() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool("memory_contradictions", json!({}), None)
.await
.expect("contradictions succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert!(v.is_array());
assert_eq!(v.as_array().unwrap().len(), 0);
});
h.shutdown(&runtime);
}
#[test]
fn entities_returns_matching_graph_entities() {
let runtime = rt();
let h = Harness::new(&runtime);
{
let conn = h.open_db();
let (_memory_id, rowid) = seed_episode(&conn, "Alice graph seed");
seed_triple_row(
&conn,
"t-mcp-entity-1",
"Alice",
"knows",
"Bob",
Some(rowid),
);
}
runtime.block_on(async {
let r = h
.server
.dispatch_tool("memory_entities", json!({ "query": "Ali" }), None)
.await
.expect("entities succeeds");
let v: serde_json::Value =
serde_json::from_str(&first_text(&r)).expect("parses as json");
assert!(
v.as_array()
.unwrap()
.iter()
.any(|row| row.get("entity_id").and_then(|id| id.as_str()) == Some("Alice")),
"expected Alice entity, got {v}"
);
});
h.shutdown(&runtime);
}
#[test]
fn contradiction_resolve_updates_lifecycle() {
let runtime = rt();
let h = Harness::new(&runtime);
{
let conn = h.open_db();
let (_memory_id, rowid) = seed_episode(&conn, "contradiction seed");
seed_triple_row(&conn, "t-mcp-a", "Alice", "likes", "tea", Some(rowid));
seed_triple_row(&conn, "t-mcp-b", "Alice", "likes", "coffee", Some(rowid));
seed_contradiction_row(&conn, "t-mcp-a", "t-mcp-b", "other");
}
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_contradiction_resolve",
json!({
"a_id": "t-mcp-a",
"b_id": "t-mcp-b",
"kind": "other",
"resolution_note": "tea is current",
"winning_triple_id": "t-mcp-a"
}),
None,
)
.await
.expect("resolve succeeds");
let resolved: serde_json::Value =
serde_json::from_str(&first_text(&r)).expect("parses as json");
assert_eq!(
resolved.get("status").and_then(|v| v.as_str()),
Some("resolved")
);
assert!(
resolved
.get("resolved_at_ms")
.and_then(|v| v.as_i64())
.is_some()
);
});
h.shutdown(&runtime);
}
#[test]
fn remember_then_recall_round_trip() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": "the cat sat on the mat" }),
None,
)
.await
.expect("remember succeeds");
let text = first_text(&r);
assert!(text.starts_with("remembered "), "got: {text}");
let r = h
.server
.dispatch_tool(
"memory_recall",
json!({ "query": "the cat sat on the mat", "limit": 5 }),
None,
)
.await
.expect("recall succeeds");
let text = first_text(&r);
assert!(text.contains("the cat sat on the mat"), "got: {text}");
});
h.shutdown(&runtime);
}
#[test]
fn update_rewrites_memory_content() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": "old mcp transport memory" }),
None,
)
.await
.expect("remember succeeds");
let text = first_text(&r);
let mid = text
.strip_prefix("remembered ")
.expect("remembered prefix")
.to_string();
let r = h
.server
.dispatch_tool(
"memory_update",
json!({
"memory_id": mid,
"content": "new mcp transport memory"
}),
None,
)
.await
.expect("update succeeds");
let updated: serde_json::Value =
serde_json::from_str(&first_text(&r)).expect("parses as json");
assert_eq!(
updated.get("content").and_then(|v| v.as_str()),
Some("new mcp transport memory")
);
});
h.shutdown(&runtime);
}
#[test]
fn memory_context_returns_json_bundle() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
h.server
.dispatch_tool(
"memory_remember",
json!({ "content": "memory context round trip" }),
None,
)
.await
.expect("remember succeeds");
let r = h
.server
.dispatch_tool(
"memory_context",
json!({ "query": "memory context", "limit": 5 }),
None,
)
.await
.expect("memory_context succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert_eq!(v["query"], "memory context");
assert!(
v["recall"]["hits"]
.as_array()
.unwrap()
.iter()
.any(|h| h["content"] == "memory context round trip"),
"context recall should include remembered content: {v}"
);
assert!(v["themes"].is_array());
assert!(v["facts"].is_array());
assert!(v["contradictions"].is_array());
});
h.shutdown(&runtime);
}
#[test]
fn forget_excludes_row_from_subsequent_recall() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": "to be forgotten" }),
None,
)
.await
.unwrap();
let text = first_text(&r);
let mid = text.strip_prefix("remembered ").unwrap().to_string();
h.server
.dispatch_tool(
"memory_forget",
json!({ "memory_id": mid, "reason": "test" }),
None,
)
.await
.expect("forget succeeds");
let r = h
.server
.dispatch_tool(
"memory_recall",
json!({ "query": "to be forgotten", "limit": 5 }),
None,
)
.await
.unwrap();
let text = first_text(&r);
assert!(
!text.contains(r#""content": "to be forgotten""#),
"forgotten row should be excluded; got: {text}"
);
});
h.shutdown(&runtime);
}
#[test]
fn empty_remember_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_remember", json!({ "content": "" }), None)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("must not be empty"));
});
h.shutdown(&runtime);
}
#[test]
fn empty_recall_query_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_recall", json!({ "query": " " }), None)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("must not be empty"));
});
h.shutdown(&runtime);
}
#[test]
fn inspect_with_invalid_id_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_inspect", json!({ "memory_id": "not-a-uuid" }), None)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("invalid memory_id"));
});
h.shutdown(&runtime);
}
#[test]
fn forget_unknown_id_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_forget",
json!({ "memory_id": "00000000-0000-7000-8000-000000000000" }),
None,
)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("not found"));
});
h.shutdown(&runtime);
}
#[test]
fn unknown_tool_name_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory.summon", json!({}), None)
.await
.unwrap_err();
assert!(format!("{err:?}").contains("unknown tool"));
});
h.shutdown(&runtime);
}
#[test]
fn tool_names_match_cross_provider_regex() {
fn passes_anthropic(name: &str) -> bool {
let len = name.len();
if !(1..=64).contains(&len) {
return false;
}
name.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
}
fn passes_openai(name: &str) -> bool {
let len = name.len();
if !(1..=64).contains(&len) {
return false;
}
let mut chars = name.chars();
let first = match chars.next() {
Some(c) => c,
None => return false,
};
if !(first.is_ascii_alphabetic() || first == '_') {
return false;
}
chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
}
fn passes_gemini(name: &str) -> bool {
let len = name.len();
if !(1..=63).contains(&len) {
return false;
}
let mut chars = name.chars();
let first = match chars.next() {
Some(c) => c,
None => return false,
};
if !(first.is_ascii_alphabetic() || first == '_') {
return false;
}
chars.all(|c| c.is_ascii_alphanumeric() || c == '_')
}
let tools = build_tools();
assert_eq!(
tools.len(),
18,
"expected 18 tools (context + update/entities/resolve + v0.5.x + document tools + remember_batch)"
);
let tool_name_strings: Vec<String> = tools.iter().map(|t| t.name.to_string()).collect();
let public_names: Vec<String> = super::tool_names().iter().map(|s| s.to_string()).collect();
assert_eq!(
tool_name_strings, public_names,
"tool_names() drifted from build_tools() — keep them in sync"
);
for t in tools {
assert!(
passes_anthropic(&t.name),
"tool name {:?} fails Anthropic regex \
^[a-zA-Z0-9_-]{{1,64}}$ — see v0.3 lesson #8",
t.name
);
assert!(
passes_openai(&t.name),
"tool name {:?} fails OpenAI function-calling regex \
^[a-zA-Z_][a-zA-Z0-9_-]*$ (len ≤ 64)",
t.name
);
assert!(
passes_gemini(&t.name),
"tool name {:?} fails Gemini function-calling regex \
^[a-zA-Z_][a-zA-Z0-9_]*$ (len ≤ 63, strict)",
t.name
);
}
}
#[test]
fn remember_batch_maxitems_matches_max_batch_size() {
let tools = build_tools();
let batch = tools
.iter()
.find(|t| t.name == "memory_remember_batch")
.expect("memory_remember_batch tool is missing");
let schema = serde_json::to_value(&batch.input_schema)
.expect("input_schema serialises as JSON");
let max_items = schema
.get("properties")
.and_then(|p| p.get("items"))
.and_then(|i| i.get("maxItems"))
.and_then(|n| n.as_u64())
.expect("memory_remember_batch.items.maxItems missing or not a u64");
assert_eq!(
max_items as usize,
solo_storage::MAX_REMEMBER_BATCH_SIZE,
"memory_remember_batch schema maxItems ({}) must equal \
solo_storage::MAX_REMEMBER_BATCH_SIZE ({}). If the cap \
changed, update both — but you should never need to: the \
schema now interpolates the constant directly.",
max_items,
solo_storage::MAX_REMEMBER_BATCH_SIZE,
);
}
#[test]
fn tool_descriptions_avoid_internal_jargon() {
const FORBIDDEN: &[&str] = &[
"SPO",
"Steward",
"Steward-flagged",
"LEFT JOIN",
"candidate pair",
"candidate_pair",
"tagged_with",
];
fn contains_case_insensitive(haystack: &str, needle: &str) -> bool {
haystack.to_lowercase().contains(&needle.to_lowercase())
}
for t in build_tools() {
let desc = t.description.as_deref().unwrap_or("");
for term in FORBIDDEN {
assert!(
!contains_case_insensitive(desc, term),
"tool {:?} description contains forbidden jargon \
{:?} — rewrite in plain English (see v0.5.0 \
Priority 4)",
t.name,
term,
);
}
}
let server_info = harness_server_info();
let instructions = server_info
.instructions
.as_deref()
.expect("get_info() must set instructions");
for term in FORBIDDEN {
assert!(
!contains_case_insensitive(instructions, term),
"get_info().instructions contains forbidden jargon \
{:?} — rewrite in plain English",
term,
);
}
}
fn harness_server_info() -> rmcp::model::ServerInfo {
let runtime = rt();
let h = Harness::new(&runtime);
let info = ServerHandler::get_info(&h.server);
h.shutdown(&runtime);
info
}
#[test]
fn server_info_identity_is_solo_not_rmcp_or_solo_api() {
let info = harness_server_info();
let name = info.server_info.name.as_str();
let version = info.server_info.version.as_str();
assert_eq!(
name, "solo",
"MCP serverInfo.name must be \"solo\" (not \"rmcp\" or \
\"solo-api\"). got name={name:?} version={version:?}"
);
assert_eq!(
version,
env!("CARGO_PKG_VERSION"),
"MCP serverInfo.version must match solo-api's compile-time \
CARGO_PKG_VERSION (i.e. the workspace.package version); \
a mismatch means we regressed back to rmcp's build env. \
got version={version:?}"
);
}
#[test]
fn inspect_cluster_unknown_id_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_inspect_cluster",
json!({ "cluster_id": "no-such-cluster" }),
None,
)
.await
.expect_err("unknown cluster must error");
let s = format!("{err:?}");
assert!(
s.contains("no-such-cluster") || s.to_lowercase().contains("not found"),
"expected error to mention the missing cluster id; got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn inspect_cluster_rejects_empty_id() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_inspect_cluster",
json!({ "cluster_id": " " }),
None,
)
.await
.expect_err("blank cluster_id must error");
let s = format!("{err:?}");
assert!(
s.to_lowercase().contains("cluster_id")
|| s.to_lowercase().contains("must not be empty"),
"got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn ingest_document_args_parse_with_required_path() {
let v: IngestDocumentArgs =
serde_json::from_value(json!({ "path": "/tmp/notes.md" })).expect("parses");
assert_eq!(v.path, "/tmp/notes.md");
let err = serde_json::from_value::<IngestDocumentArgs>(json!({})).unwrap_err();
assert!(format!("{err}").contains("path"));
}
#[test]
fn search_docs_args_parse_with_default_limit() {
let v: SearchDocsArgs =
serde_json::from_value(json!({ "query": "backups" })).expect("parses");
assert_eq!(v.query, "backups");
assert_eq!(v.limit, 5, "default limit must be 5");
let v: SearchDocsArgs =
serde_json::from_value(json!({ "query": "backups", "limit": 20 })).expect("parses");
assert_eq!(v.limit, 20);
}
#[test]
fn inspect_document_args_parse_with_required_doc_id() {
let v: InspectDocumentArgs =
serde_json::from_value(json!({ "doc_id": "abc" })).expect("parses");
assert_eq!(v.doc_id, "abc");
let err = serde_json::from_value::<InspectDocumentArgs>(json!({})).unwrap_err();
assert!(format!("{err}").contains("doc_id"));
}
#[test]
fn list_documents_args_parse_with_all_defaults() {
let v: ListDocumentsArgs = serde_json::from_value(json!({})).expect("parses");
assert_eq!(v.limit, 20, "default limit must be 20");
assert_eq!(v.offset, 0, "default offset must be 0");
assert!(
!v.include_forgotten,
"default include_forgotten must be false"
);
let v: ListDocumentsArgs =
serde_json::from_value(json!({ "limit": 5, "offset": 10, "include_forgotten": true }))
.expect("parses");
assert_eq!(v.limit, 5);
assert_eq!(v.offset, 10);
assert!(v.include_forgotten);
}
#[test]
fn forget_document_args_parse_with_required_doc_id() {
let v: ForgetDocumentArgs =
serde_json::from_value(json!({ "doc_id": "abc" })).expect("parses");
assert_eq!(v.doc_id, "abc");
let err = serde_json::from_value::<ForgetDocumentArgs>(json!({})).unwrap_err();
assert!(format!("{err}").contains("doc_id"));
}
#[test]
fn ingest_document_rejects_empty_path() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_ingest_document", json!({ "path": "" }), None)
.await
.expect_err("empty path must error");
let s = format!("{err:?}");
assert!(
s.to_lowercase().contains("path") || s.to_lowercase().contains("must not be empty"),
"got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn search_docs_rejects_empty_query() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_search_docs", json!({ "query": " " }), None)
.await
.expect_err("empty query must error");
let s = format!("{err:?}");
assert!(
s.to_lowercase().contains("must not be empty")
|| s.to_lowercase().contains("invalid"),
"got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn inspect_document_unknown_id_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_inspect_document",
json!({ "doc_id": "00000000-0000-7000-8000-000000000000" }),
None,
)
.await
.expect_err("unknown doc must error");
let s = format!("{err:?}");
assert!(
s.to_lowercase().contains("not found"),
"expected 'not found' message; got: {s}"
);
});
h.shutdown(&runtime);
}
#[test]
fn inspect_document_rejects_malformed_id() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_inspect_document",
json!({ "doc_id": "not-a-uuid" }),
None,
)
.await
.expect_err("malformed doc_id must error");
let s = format!("{err:?}");
assert!(s.contains("invalid doc_id"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn list_documents_returns_empty_array_on_empty_db() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool("memory_list_documents", json!({}), None)
.await
.expect("list succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert!(v.is_array(), "expected array, got: {text}");
assert_eq!(v.as_array().unwrap().len(), 0);
});
h.shutdown(&runtime);
}
#[test]
fn list_documents_passes_through_limit_offset_include_args() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_list_documents",
json!({ "limit": 5, "offset": 10, "include_forgotten": true }),
None,
)
.await
.expect("list with args succeeds");
let text = first_text(&r);
let v: serde_json::Value = serde_json::from_str(&text).expect("parses as json");
assert!(v.is_array());
});
h.shutdown(&runtime);
}
#[test]
fn forget_document_rejects_malformed_id() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_forget_document",
json!({ "doc_id": "not-a-uuid" }),
None,
)
.await
.expect_err("malformed doc_id must error");
let s = format!("{err:?}");
assert!(s.contains("invalid doc_id"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_with_explicit_salience_round_trips() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let r = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": "with salience", "salience": 0.83 }),
None,
)
.await
.expect("remember w/ salience succeeds");
let text = first_text(&r);
assert!(text.starts_with("remembered "), "got: {text}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_with_out_of_range_salience_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": "out of range", "salience": 1.5 }),
None,
)
.await
.unwrap_err();
let s = format!("{err:?}");
assert!(s.contains("salience must be"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_with_boundary_salience_succeeds() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
for s in [0.0_f64, 1.0_f64] {
let r = h
.server
.dispatch_tool(
"memory_remember",
json!({ "content": format!("boundary-{s}"), "salience": s }),
None,
)
.await
.expect("boundary salience succeeds");
assert!(first_text(&r).starts_with("remembered "));
}
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_returns_ids_in_order() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let items = json!([
{ "content": "batch-a" },
{ "content": "batch-b", "source_type": "user_preference", "salience": 0.9 },
{ "content": "batch-c", "salience": 0.1 },
]);
let r = h
.server
.dispatch_tool("memory_remember_batch", json!({ "items": items }), None)
.await
.expect("batch succeeds");
let text = first_text(&r);
let parsed: serde_json::Value = serde_json::from_str(&text).expect("reply is JSON");
let arr = parsed.as_array().expect("reply is array");
assert_eq!(arr.len(), 3, "3 items in → 3 ids out: {text}");
for v in arr {
let s = v.as_str().unwrap_or_else(|| panic!("non-string id: {v}"));
assert_eq!(s.len(), 36, "UUID-shaped id expected: {s}");
}
let mut ids: Vec<&str> = arr.iter().map(|v| v.as_str().unwrap()).collect();
ids.sort();
ids.dedup();
assert_eq!(ids.len(), 3, "ids must be distinct: {text}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_empty_items_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let err = h
.server
.dispatch_tool("memory_remember_batch", json!({ "items": [] }), None)
.await
.unwrap_err();
let s = format!("{err:?}");
assert!(s.contains("must not be empty"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_rejects_per_item_empty_content() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let items = json!([
{ "content": "ok-1" },
{ "content": " " },
{ "content": "ok-3" },
]);
let err = h
.server
.dispatch_tool("memory_remember_batch", json!({ "items": items }), None)
.await
.unwrap_err();
let s = format!("{err:?}");
assert!(s.contains("items[1]"), "must mention items[1]: {s}");
assert!(s.contains("must not be empty"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_rejects_per_item_salience_out_of_range() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let items = json!([
{ "content": "ok-1", "salience": 0.5 },
{ "content": "out-of-range", "salience": -0.1 },
]);
let err = h
.server
.dispatch_tool("memory_remember_batch", json!({ "items": items }), None)
.await
.unwrap_err();
let s = format!("{err:?}");
assert!(s.contains("items[1]"), "must mention items[1]: {s}");
assert!(s.contains("salience must be"), "got: {s}");
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_over_cap_returns_invalid_params() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let items: Vec<serde_json::Value> = (0..(solo_storage::MAX_REMEMBER_BATCH_SIZE + 1))
.map(|i| json!({ "content": format!("over-{i}") }))
.collect();
let err = h
.server
.dispatch_tool("memory_remember_batch", json!({ "items": items }), None)
.await
.unwrap_err();
let s = format!("{err:?}");
assert!(
s.contains("MAX_REMEMBER_BATCH_SIZE"),
"must mention the cap: {s}"
);
});
h.shutdown(&runtime);
}
use crate::mcp_progress::{ProgressReporter, ProgressToken};
use crate::mcp_session::SessionState;
use std::sync::Arc as StdArc2;
fn fresh_progress_session() -> StdArc2<SessionState> {
StdArc2::new(SessionState::new(
solo_core::TenantId::default_tenant(),
None,
))
}
fn drain_progress_events(
rx: &mut tokio::sync::broadcast::Receiver<crate::mcp_session::McpStreamEvent>,
) -> Vec<crate::mcp_session::McpStreamEvent> {
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
out
}
#[test]
fn search_docs_emits_progress_only_when_top_k_above_100() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!(42)));
let _r = h
.server
.dispatch_tool(
"memory_search_docs",
json!({ "query": "anything", "limit": 150 }),
Some(reporter),
)
.await
.expect("search succeeds");
let events = drain_progress_events(&mut rx);
assert_eq!(
events.len(),
3,
"expected 3 search progress events at top_k=150, got {}",
events.len()
);
for (i, ev) in events.iter().enumerate() {
let params = &ev.data["params"];
assert_eq!(params["progressToken"], json!(42));
assert_eq!(params["total"], json!(3));
assert_eq!(params["progress"], json!((i + 1) as u64));
}
});
h.shutdown(&runtime);
}
#[test]
fn search_docs_emits_no_progress_when_top_k_below_threshold() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
let _r = h
.server
.dispatch_tool(
"memory_search_docs",
json!({ "query": "anything", "limit": 50 }),
Some(reporter),
)
.await
.expect("search succeeds");
let events = drain_progress_events(&mut rx);
assert!(
events.is_empty(),
"expected no progress events at top_k=50, got {events:?}"
);
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_emits_progress_only_when_size_above_50() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("batch")));
let items: Vec<serde_json::Value> = (0..51)
.map(|i| json!({ "content": format!("item-{i}") }))
.collect();
let _r = h
.server
.dispatch_tool(
"memory_remember_batch",
json!({ "items": items }),
Some(reporter),
)
.await
.expect("batch succeeds");
let events = drain_progress_events(&mut rx);
assert_eq!(
events.len(),
4,
"expected 4 batch progress events for 51 items, got {}: {events:?}",
events.len()
);
let progresses: Vec<u64> = events
.iter()
.map(|e| e.data["params"]["progress"].as_u64().unwrap_or(0))
.collect();
assert_eq!(progresses, vec![25, 50, 51, 51]);
assert_eq!(
events.last().unwrap().data["params"]["message"],
json!("inserted")
);
for ev in &events {
assert_eq!(ev.data["params"]["progressToken"], json!("batch"));
assert_eq!(ev.data["params"]["total"], json!(51));
}
});
h.shutdown(&runtime);
}
#[test]
fn remember_batch_emits_no_progress_when_size_below_threshold() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let reporter = ProgressReporter::new(session.clone(), ProgressToken(json!("t")));
let items: Vec<serde_json::Value> = (0..5)
.map(|i| json!({ "content": format!("small-{i}") }))
.collect();
let _r = h
.server
.dispatch_tool(
"memory_remember_batch",
json!({ "items": items }),
Some(reporter),
)
.await
.expect("batch succeeds");
let events = drain_progress_events(&mut rx);
assert!(
events.is_empty(),
"expected no progress events for 5-item batch, got {events:?}"
);
});
h.shutdown(&runtime);
}
#[test]
fn stdio_transport_does_not_emit_progress_events() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let _r = h
.server
.dispatch_tool(
"memory_search_docs",
json!({ "query": "anything", "limit": 200 }),
None, )
.await
.expect("search succeeds without reporter");
let events = drain_progress_events(&mut rx);
assert!(
events.is_empty(),
"stdio path (no reporter) must not publish to ANY session: {events:?}"
);
});
h.shutdown(&runtime);
}
#[test]
fn progress_event_id_monotonic_per_session() {
let runtime = rt();
let h = Harness::new(&runtime);
runtime.block_on(async {
let session = fresh_progress_session();
let mut rx = session.subscribe_events();
let r1 = ProgressReporter::new(session.clone(), ProgressToken(json!("a")));
let r2 = ProgressReporter::new(session.clone(), ProgressToken(json!("b")));
let _ = h
.server
.dispatch_tool(
"memory_search_docs",
json!({ "query": "q1", "limit": 150 }),
Some(r1),
)
.await;
let _ = h
.server
.dispatch_tool(
"memory_search_docs",
json!({ "query": "q2", "limit": 150 }),
Some(r2),
)
.await;
let events = drain_progress_events(&mut rx);
assert!(events.len() >= 6, "expected at least 6 events: {events:?}");
let ids: Vec<u64> = events.iter().map(|e| e.id).collect();
for w in ids.windows(2) {
assert!(w[0] < w[1], "event ids must be strictly monotonic: {ids:?}");
}
});
h.shutdown(&runtime);
}
}
#[cfg(test)]
mod principal_extraction_tests {
use super::*;
use std::sync::Mutex;
static ENV_LOCK: Mutex<()> = Mutex::new(());
struct EnvGuard;
impl Drop for EnvGuard {
fn drop(&mut self) {
unsafe { std::env::remove_var(ENV_MCP_PRINCIPAL_TOKEN) };
}
}
fn set_principal_env(val: &str) -> EnvGuard {
unsafe { std::env::set_var(ENV_MCP_PRINCIPAL_TOKEN, val) };
EnvGuard
}
fn clear_principal_env() -> EnvGuard {
unsafe { std::env::remove_var(ENV_MCP_PRINCIPAL_TOKEN) };
EnvGuard
}
#[test]
fn stdio_env_var_resolves_to_principal() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env("alice-token");
let resolved = resolve_mcp_principal(None);
assert_eq!(resolved.as_deref(), Some("alice-token"));
}
#[test]
fn stdio_no_env_var_resolves_to_none() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_principal_env();
assert_eq!(resolve_mcp_principal(None), None);
}
#[test]
fn stdio_whitespace_env_var_resolves_to_none() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env(" \t ");
assert_eq!(resolve_mcp_principal(None), None);
}
#[test]
fn http_header_resolves_to_bearer_token_principal() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = clear_principal_env();
let resolved = resolve_mcp_principal(Some("Bearer api-token-xyz"));
assert_eq!(resolved.as_deref(), Some("api-token-xyz"));
}
#[test]
fn http_header_beats_env_var() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env("env-token");
let resolved = resolve_mcp_principal(Some("Bearer header-token"));
assert_eq!(
resolved.as_deref(),
Some("header-token"),
"header MUST win over env var per documented precedence"
);
}
#[test]
fn http_malformed_header_falls_through_to_env() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env("env-fallback");
let resolved = resolve_mcp_principal(Some("Basic dXNlcjpwYXNz"));
assert_eq!(resolved.as_deref(), Some("env-fallback"));
}
#[test]
fn http_empty_bearer_header_falls_through_to_env() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env("env-fallback");
let resolved = resolve_mcp_principal(Some("Bearer "));
assert_eq!(resolved.as_deref(), Some("env-fallback"));
}
#[test]
fn stable_across_multiple_resolutions() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|p| p.into_inner());
let _g = set_principal_env("stable-token");
for _ in 0..5 {
assert_eq!(resolve_mcp_principal(None).as_deref(), Some("stable-token"));
}
}
}
#[cfg(test)]
mod initialize_decision_tests {
use super::*;
use solo_storage::LlmSettings;
#[test]
fn no_llm_block_allows_initialize_regardless_of_sampling_capability() {
assert_eq!(initialize_decision(&None, false), InitializeDecision::Allow);
assert_eq!(initialize_decision(&None, true), InitializeDecision::Allow);
}
#[test]
fn llm_none_allows_initialize_regardless_of_sampling_capability() {
let s = Some(LlmSettings::None);
assert_eq!(initialize_decision(&s, false), InitializeDecision::Allow);
assert_eq!(initialize_decision(&s, true), InitializeDecision::Allow);
}
#[test]
fn llm_anthropic_allows_initialize_regardless_of_sampling_capability() {
let s = Some(LlmSettings::Anthropic {
api_key_env: "ANTHROPIC_API_KEY".into(),
model: "claude-sonnet-4-6".into(),
});
assert_eq!(initialize_decision(&s, false), InitializeDecision::Allow);
assert_eq!(initialize_decision(&s, true), InitializeDecision::Allow);
}
#[test]
fn llm_ollama_allows_initialize_regardless_of_sampling_capability() {
let s = Some(LlmSettings::Ollama {
base_url: "http://localhost:11434".into(),
model: "qwen3-coder:30b".into(),
});
assert_eq!(initialize_decision(&s, false), InitializeDecision::Allow);
assert_eq!(initialize_decision(&s, true), InitializeDecision::Allow);
}
#[test]
fn llm_mcp_sampling_with_sampling_capability_populates_slot() {
let s = Some(LlmSettings::McpSampling);
assert_eq!(
initialize_decision(&s, true),
InitializeDecision::PopulateSamplingSteward
);
}
#[test]
fn llm_mcp_sampling_without_sampling_capability_rejects() {
let s = Some(LlmSettings::McpSampling);
assert_eq!(
initialize_decision(&s, false),
InitializeDecision::RejectMissingSamplingCapability
);
}
#[test]
fn sampling_capability_missing_error_message_contains_all_alternatives() {
let msg = sampling_capability_missing_error_message();
assert!(msg.contains("LLM backend `mcp_sampling`"));
assert!(msg.contains("mode = \"anthropic\""));
assert!(msg.contains("api_key_env = \"ANTHROPIC_API_KEY\""));
assert!(msg.contains("mode = \"openai\""));
assert!(msg.contains("api_key_env = \"OPENAI_API_KEY\""));
assert!(msg.contains("mode = \"ollama\""));
assert!(msg.contains("base_url = \"http://localhost:11434\""));
assert!(msg.contains("mode = \"none\""));
assert!(msg.contains("docs/releases/v0.9.0.md"));
}
}