use std::collections::HashMap;
use serde_json::{json, Value};
use crate::intelligence::memory_policy::{
blend_retrieval_priority, extract_features, score_policy, PolicyFeatureInput,
};
use crate::search::{hybrid_search, RerankConfig, RerankStrategy, Reranker};
use crate::storage::queries::get_policy_record;
use crate::types::*;
use super::HandlerContext;
#[derive(Clone)]
struct PolicyRerankInfo {
score: f32,
blended_score: f32,
reason: String,
version: String,
source: &'static str,
}
impl PolicyRerankInfo {
fn to_json(&self) -> Value {
json!({
"score": self.score,
"blended_score": self.blended_score,
"reason": self.reason,
"version": self.version,
"source": self.source
})
}
}
pub fn memory_search(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::result_cache::CacheFilterParams;
let query = params.get("query").and_then(|v| v.as_str()).unwrap_or("");
let mut options: SearchOptions = serde_json::from_value(params.clone()).unwrap_or_default();
let policy_rerank = params
.get("policy_rerank")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let policy_explain = params
.get("policy_explain")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let global = params
.get("global")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if global {
options.global = true;
options.workspace = None;
options.workspaces = None;
}
let rerank_enabled = params
.get("rerank")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let rerank_strategy = match params.get("rerank_strategy").and_then(|v| v.as_str()) {
Some("none") => RerankStrategy::None,
Some("multi_signal") => RerankStrategy::MultiSignal,
_ => RerankStrategy::Heuristic,
};
let query_embedding = ctx.embedder.embed(query).ok();
let embedding_ref = query_embedding.as_deref();
let cache_filters = CacheFilterParams {
workspace: options.workspace.clone(),
tier: options.tier.map(|t| t.as_str().to_string()),
memory_types: options.memory_type.map(|t| vec![t]),
include_archived: options.include_archived,
include_transcripts: options.include_transcripts,
tags: options.tags.clone(),
global,
};
let skip_cache = params
.get("skip_cache")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !skip_cache && !rerank_enabled && !policy_rerank {
if let Some(cached_results) = ctx.search_cache.get(query, embedding_ref, &cache_filters) {
if global {
let results_with_ws: Vec<Value> = cached_results
.iter()
.map(|r| {
json!({
"memory": r.memory,
"score": r.score,
"match_info": r.match_info,
"workspace": r.memory.workspace
})
})
.collect();
return json!({"results": results_with_ws, "cached": true});
}
return json!({"results": cached_results, "cached": true});
}
}
let mut search_config = ctx.search_config.clone();
if let Ok(cwd) = std::env::current_dir() {
if let Ok(canonical) = cwd.canonicalize() {
search_config.project_context_path = Some(canonical.to_string_lossy().to_string());
}
}
ctx.storage
.with_connection(|conn| {
let mut results = hybrid_search(conn, query, embedding_ref, &options, &search_config)?;
if !rerank_enabled && !skip_cache && !policy_rerank {
ctx.search_cache.put(
query,
query_embedding.clone(),
cache_filters.clone(),
results.clone(),
);
}
let mut policy_info_by_memory_id = HashMap::new();
if policy_rerank {
for result in &mut results {
let hybrid_score = result.score;
let stored_policy = get_policy_record(conn, result.memory.id)?;
let (policy_score, source) = if let Some(policy) = stored_policy.as_ref() {
(
crate::intelligence::memory_policy::PolicyScore {
salience_score: policy.salience_score,
retention_score: policy.retention_score,
retrieval_priority: policy.retrieval_priority,
policy_version: policy.policy_version.clone(),
policy_reason: policy.policy_reason.clone(),
},
"stored",
)
} else {
let features = extract_features(PolicyFeatureInput {
memory: &result.memory,
existing_policy: None,
event: None,
hybrid_search_score: Some(hybrid_score),
session_relevance: None,
});
(score_policy(&features), "heuristic-v1")
};
let blended_score =
blend_retrieval_priority(hybrid_score, policy_score.retrieval_priority);
result.score = blended_score;
policy_info_by_memory_id.insert(
result.memory.id,
PolicyRerankInfo {
score: policy_score.retrieval_priority,
blended_score,
reason: policy_score.policy_reason,
version: policy_score.policy_version,
source,
},
);
}
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
if rerank_enabled && rerank_strategy != RerankStrategy::None {
let config = RerankConfig {
enabled: true,
strategy: rerank_strategy,
..Default::default()
};
let reranker = Reranker::with_config(config);
let reranked = reranker.rerank(results, query, Some(conn));
if options.explain {
Ok(json!({
"results": reranked.iter().map(|r| {
let mut obj = json!({
"memory": r.result.memory,
"score": r.rerank_info.final_score,
"match_info": r.result.match_info,
"rerank_info": r.rerank_info
});
if global {
obj["workspace"] = json!(r.result.memory.workspace);
}
if policy_rerank && policy_explain {
if let Some(policy) = policy_info_by_memory_id.get(&r.result.memory.id) {
obj["policy"] = policy.to_json();
}
}
obj
}).collect::<Vec<_>>(),
"reranked": true,
"strategy": format!("{:?}", rerank_strategy)
}))
} else {
Ok(json!(reranked
.iter()
.map(|r| {
let mut obj = json!({
"memory": r.result.memory,
"score": r.rerank_info.final_score,
"match_info": r.result.match_info
});
if global {
obj["workspace"] = json!(r.result.memory.workspace);
}
if policy_rerank && policy_explain {
if let Some(policy) = policy_info_by_memory_id.get(&r.result.memory.id) {
obj["policy"] = policy.to_json();
}
}
obj
})
.collect::<Vec<_>>()))
}
} else if global {
Ok(json!(results
.iter()
.map(|r| {
let mut obj = json!({
"memory": r.memory,
"score": r.score,
"match_info": r.match_info,
"workspace": r.memory.workspace
});
if policy_rerank && policy_explain {
if let Some(policy) = policy_info_by_memory_id.get(&r.memory.id) {
obj["policy"] = policy.to_json();
}
}
obj
})
.collect::<Vec<_>>()))
} else {
Ok(json!(results
.iter()
.map(|r| {
let mut obj = json!({
"memory": r.memory,
"score": r.score,
"match_info": r.match_info
});
if policy_rerank && policy_explain {
if let Some(policy) = policy_info_by_memory_id.get(&r.memory.id) {
obj["policy"] = policy.to_json();
}
}
obj
})
.collect::<Vec<_>>()))
}
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn search_suggest(ctx: &HandlerContext, params: Value) -> Value {
let query = params.get("query").and_then(|v| v.as_str()).unwrap_or("");
let fuzzy = ctx.fuzzy_engine.lock();
let result = fuzzy.correct_query(query);
json!(result)
}
pub fn memory_search_by_identity(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::search_by_identity;
let identity = match params.get("identity").and_then(|v| v.as_str()) {
Some(i) => i,
None => return json!({"error": "identity is required"}),
};
let workspace = params.get("workspace").and_then(|v| v.as_str());
let limit = params
.get("limit")
.and_then(|v| v.as_u64())
.map(|v| v as usize);
ctx.storage
.with_connection(|conn| {
let memories = search_by_identity(conn, identity, workspace, limit)?;
Ok(json!({"memories": memories}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_session_search(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::search_sessions;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q,
None => return json!({"error": "query is required"}),
};
let session_id = params.get("session_id").and_then(|v| v.as_str());
let workspace = params.get("workspace").and_then(|v| v.as_str());
let limit = params
.get("limit")
.and_then(|v| v.as_u64())
.map(|v| v as usize);
ctx.storage
.with_connection(|conn| {
let memories = search_sessions(conn, query, session_id, workspace, limit)?;
Ok(json!({"memories": memories}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn find_duplicates(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::find_duplicates;
let threshold = params
.get("threshold")
.and_then(|v| v.as_f64())
.unwrap_or(0.9);
ctx.storage
.with_connection(|conn| {
let duplicates = find_duplicates(conn, threshold)?;
Ok(json!({
"count": duplicates.len(),
"threshold": threshold,
"duplicates": duplicates
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn find_semantic_duplicates(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::find_duplicates_by_embedding;
let threshold = params
.get("threshold")
.and_then(|v| v.as_f64())
.unwrap_or(0.92) as f32;
let workspace = params.get("workspace").and_then(|v| v.as_str());
let limit = params.get("limit").and_then(|v| v.as_i64()).unwrap_or(50) as usize;
ctx.storage
.with_connection(|conn| {
let duplicates = find_duplicates_by_embedding(conn, threshold, workspace, limit)?;
Ok(json!({
"count": duplicates.len(),
"threshold": threshold,
"method": "embedding_cosine_similarity",
"duplicates": duplicates
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn search_cache_feedback(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::CacheFilterParams;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q,
None => return json!({"error": "query is required"}),
};
let positive = match params.get("positive").and_then(|v| v.as_bool()) {
Some(p) => p,
None => return json!({"error": "positive is required"}),
};
let workspace = params
.get("workspace")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let filters = CacheFilterParams {
workspace,
..Default::default()
};
ctx.search_cache.record_feedback(query, &filters, positive);
let new_threshold = ctx.search_cache.current_threshold();
json!({
"recorded": true,
"feedback": if positive { "positive" } else { "negative" },
"current_threshold": new_threshold
})
}
pub fn search_cache_stats(ctx: &HandlerContext, _params: Value) -> Value {
let stats = ctx.search_cache.stats();
json!(stats)
}
pub fn search_cache_clear(ctx: &HandlerContext, params: Value) -> Value {
let workspace = params.get("workspace").and_then(|v| v.as_str());
if let Some(ws) = workspace {
ctx.search_cache.invalidate_for_workspace(Some(ws));
json!({"cleared": true, "scope": "workspace", "workspace": ws})
} else {
ctx.search_cache.clear();
json!({"cleared": true, "scope": "all"})
}
}
pub fn memory_explain_search(_ctx: &HandlerContext, params: Value) -> Value {
use crate::search::explain::SearchExplainer;
let results = match params.get("results").and_then(|v| v.as_array()) {
Some(arr) => arr,
None => {
return json!({"error": "results array is required (each with memory_id, bm25, vector, fuzzy, recency, importance, final_score, and optional rerank_score)"})
}
};
let reranking_active = params
.get("reranking_active")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let rrf_k = params.get("rrf_k").and_then(|v| v.as_f64()).unwrap_or(60.0) as f32;
let explainer = SearchExplainer::new(rrf_k, reranking_active);
let batch: Vec<_> = results
.iter()
.filter_map(|r| {
let memory_id = r.get("memory_id")?.as_i64()?;
let bm25 = r.get("bm25").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
let vector = r.get("vector").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
let fuzzy = r.get("fuzzy").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
let recency = r.get("recency").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
let importance = r.get("importance").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
let rerank = r
.get("rerank_score")
.and_then(|v| v.as_f64())
.map(|v| v as f32);
let final_score = r.get("final_score").and_then(|v| v.as_f64()).unwrap_or(0.0) as f32;
Some((
memory_id,
bm25,
vector,
fuzzy,
recency,
importance,
rerank,
final_score,
))
})
.collect();
let explanations = explainer.explain_batch(batch);
json!({
"count": explanations.len(),
"explanations": explanations
})
}
pub fn memory_feedback(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::feedback::{record_feedback, FeedbackSignal};
use crate::storage::feedback::FeedbackProcessor;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q,
None => return json!({"error": "query is required"}),
};
let memory_id = match params.get("memory_id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "memory_id is required"}),
};
let (feedback_signal, signal_str): (FeedbackSignal, &str) = match params
.get("signal")
.and_then(|v| v.as_str())
{
Some("useful") | Some("helpful") => (FeedbackSignal::Useful, "helpful"),
Some("irrelevant") | Some("not_helpful") => (FeedbackSignal::Irrelevant, "not_helpful"),
Some("outdated") => (FeedbackSignal::Outdated, "outdated"),
Some("conflict") => (FeedbackSignal::Conflict, "conflict"),
_ => {
return json!({"error": "signal must be 'helpful'/'useful', 'not_helpful'/'irrelevant', 'outdated', or 'conflict'"});
}
};
let rank_position = params
.get("rank_position")
.and_then(|v| v.as_i64())
.map(|v| v as i32);
let original_score = params
.get("original_score")
.and_then(|v| v.as_f64())
.map(|v| v as f32);
let workspace = params
.get("workspace")
.and_then(|v| v.as_str())
.unwrap_or("default");
ctx.storage
.with_connection(|conn| {
let fb = record_feedback(
conn,
query,
memory_id,
feedback_signal,
rank_position,
original_score,
workspace,
)?;
let consolidator =
std::sync::Arc::new(crate::mcp::handlers::auto_consolidate::QueueingConsolidator);
let processor = FeedbackProcessor::new().with_consolidator(consolidator);
let (new_score, scheduled) = processor.process_feedback(memory_id, signal_str, conn)?;
let mut result = json!(fb);
if let Some(obj) = result.as_object_mut() {
obj.insert("utility_score_after".to_string(), json!(new_score));
obj.insert("scheduled_for_consolidation".to_string(), json!(scheduled));
}
Ok(result)
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_feedback_stats(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::feedback::feedback_stats;
let workspace = params.get("workspace").and_then(|v| v.as_str());
ctx.storage
.with_connection(|conn| {
let stats = feedback_stats(conn, workspace)?;
Ok(json!(stats))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_explain_utility(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::utility::UtilityTracker;
let memory_id = match params.get("memory_id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "memory_id is required"}),
};
ctx.storage
.with_connection(|conn| {
let tracker = UtilityTracker::new();
let explanation = tracker.explain_utility(conn, memory_id)?;
Ok(json!(explanation))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_search_compact(ctx: &HandlerContext, params: Value) -> Value {
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q,
None => return json!({"error": "query is required"}),
};
let mut options: SearchOptions = serde_json::from_value(params.clone()).unwrap_or_default();
let global = params
.get("global")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if global {
options.global = true;
options.workspace = None;
options.workspaces = None;
}
if options.limit.is_none() {
let limit_from_param = params.get("limit").and_then(|v| v.as_i64());
options.limit = Some(limit_from_param.unwrap_or(10));
}
let query_embedding = ctx.embedder.embed(query).ok();
let embedding_ref = query_embedding.as_deref();
let mut search_config = ctx.search_config.clone();
if let Ok(cwd) = std::env::current_dir() {
if let Ok(canonical) = cwd.canonicalize() {
search_config.project_context_path = Some(canonical.to_string_lossy().to_string());
}
}
ctx.storage
.with_connection(|conn| {
let results = hybrid_search(conn, query, embedding_ref, &options, &search_config)?;
let compact: Vec<Value> = results
.iter()
.map(|r| {
let memory = &r.memory;
let first_line = memory.content.lines().next().unwrap_or("");
let has_more_lines = memory.content.contains('\n');
let title_str = if first_line.len() > 80 {
format!("{}...", &first_line[..80])
} else if has_more_lines {
format!("{}...", first_line)
} else {
first_line.to_string()
};
let mut obj = json!({
"id": memory.id,
"title": title_str,
"created_at": memory.created_at,
"tags": memory.tags
});
if global {
obj["workspace"] = json!(memory.workspace);
}
obj
})
.collect();
Ok(json!({
"results": compact,
"count": compact.len()
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn recent_activity(ctx: &HandlerContext, params: Value) -> Value {
let workspace = params.get("workspace").and_then(|v| v.as_str());
let timeframe = params
.get("timeframe")
.and_then(|v| v.as_str())
.unwrap_or("24h");
let limit = params
.get("limit")
.and_then(|v| v.as_u64())
.unwrap_or(20)
.min(100) as i64;
let include_types: Option<Vec<String>> = params
.get("include_types")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
});
let time_clause = match timeframe {
"1h" => "AND created_at > datetime('now', '-1 hours')",
"24h" => "AND created_at > datetime('now', '-24 hours')",
"7d" => "AND created_at > datetime('now', '-7 days')",
"30d" => "AND created_at > datetime('now', '-30 days')",
_ => "AND created_at > datetime('now', '-24 hours')",
};
let workspace_owned = workspace.map(String::from);
let timeframe_owned = timeframe.to_string();
ctx.storage
.with_connection(|conn| {
let mut sql = format!(
"SELECT m.id, m.content, m.memory_type, m.importance, m.workspace, \
m.created_at, m.updated_at, \
(SELECT GROUP_CONCAT(t.name, ',') FROM memory_tags mt \
JOIN tags t ON mt.tag_id = t.id WHERE mt.memory_id = m.id) AS tags \
FROM memories m WHERE m.valid_to IS NULL {} ",
time_clause
);
let mut param_values: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref ws) = workspace_owned {
sql.push_str(&format!("AND m.workspace = ?{} ", param_values.len() + 1));
param_values.push(Box::new(ws.clone()));
}
if let Some(ref types) = include_types {
if !types.is_empty() {
let placeholders: Vec<String> = types
.iter()
.enumerate()
.map(|(i, _)| format!("?{}", param_values.len() + i + 1))
.collect();
sql.push_str(&format!(
"AND m.memory_type IN ({}) ",
placeholders.join(",")
));
for t in types {
param_values.push(Box::new(t.clone()));
}
}
}
sql.push_str(&format!(
"ORDER BY COALESCE(m.updated_at, m.created_at) DESC LIMIT ?{}",
param_values.len() + 1
));
param_values.push(Box::new(limit));
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(
rusqlite::params_from_iter(param_values.iter().map(|p| p.as_ref())),
|row| {
let content: String = row.get(1)?;
let char_count = content.chars().count();
let preview: String = content.chars().take(100).collect();
let preview = if char_count > 100 {
format!("{}...", preview)
} else {
preview
};
Ok(json!({
"id": row.get::<_, i64>(0)?,
"preview": preview,
"memory_type": row.get::<_, String>(2)?,
"importance": row.get::<_, f64>(3)?,
"workspace": row.get::<_, String>(4)?,
"created_at": row.get::<_, String>(5)?,
"updated_at": row.get::<_, String>(6)?,
"tags": row.get::<_, Option<String>>(7)?
}))
},
)?;
let activities: Vec<Value> = rows.filter_map(|r| r.ok()).collect();
let count = activities.len();
Ok(json!({
"activities": activities,
"count": count,
"timeframe": timeframe_owned,
"workspace": workspace_owned
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_expand(ctx: &HandlerContext, params: Value) -> Value {
use crate::error::EngramError;
use crate::storage::queries::get_memory;
let ids: Vec<i64> = match params.get("ids").and_then(|v| v.as_array()) {
Some(arr) => arr.iter().filter_map(|v| v.as_i64()).collect(),
None => return json!({"error": "ids array is required"}),
};
let requested = ids.len();
ctx.storage
.with_connection(|conn| {
let mut memories: Vec<Value> = Vec::with_capacity(ids.len());
for id in &ids {
match get_memory(conn, *id) {
Ok(memory) => memories.push(json!(memory)),
Err(EngramError::NotFound(_)) => {
}
Err(e) => return Err(e),
}
}
let found = memories.len();
Ok(json!({
"memories": memories,
"found": found,
"requested": requested
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}