use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _};
use chrono::Utc;
use serde::Deserialize;
use serde_json::{json, Value};
use super::HandlerContext;
use crate::storage::enrichment_events::{emit_best_effort, EnrichmentEvent};
fn safe_truncate(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let mut boundary = max_bytes;
while boundary > 0 && !s.is_char_boundary(boundary) {
boundary -= 1;
}
&s[..boundary]
}
pub fn context_record(ctx: &HandlerContext, params: Value) -> Value {
let policy = crate::context::policy::OperationalContextPolicy::from_params(¶ms);
let request: crate::context::ContextRecordRequest = match serde_json::from_value(params) {
Ok(request) => request,
Err(e) => return json!({"error": e.to_string()}),
};
ctx.storage
.with_connection(|conn| crate::context::record_context(conn, &policy, request))
.map(|response| json!(response))
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn context_record_artifact(ctx: &HandlerContext, params: Value) -> Value {
let policy = crate::context::policy::OperationalContextPolicy::from_params(¶ms);
let request: crate::context::ContextRecordArtifactRequest = match serde_json::from_value(params)
{
Ok(request) => request,
Err(e) => return json!({"error": e.to_string()}),
};
ctx.storage
.with_connection(|conn| crate::context::record_context_artifact(conn, &policy, request))
.map(|response| json!(response))
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
#[derive(Debug, Deserialize)]
struct ContextGetArtifactRequest {
artifact_id: String,
requester_agent_id: Option<String>,
session_id: Option<String>,
task_id: Option<String>,
repo_id: Option<String>,
workspace_path_hash: Option<String>,
#[serde(default)]
workspace: Option<String>,
max_bytes: Option<usize>,
#[serde(default)]
allow_stale: bool,
#[serde(default = "default_require_redacted")]
require_redacted: bool,
reason: String,
}
fn default_require_redacted() -> bool {
true
}
pub fn context_get_artifact(ctx: &HandlerContext, params: Value) -> Value {
let request: ContextGetArtifactRequest = match serde_json::from_value(params) {
Ok(request) => request,
Err(e) => return json!({"error": e.to_string()}),
};
if request.reason.trim().is_empty() {
return json!({"error": "context_get_artifact requires a non-empty reason"});
}
let require_redacted = request.require_redacted;
let storage_request = crate::storage::ArtifactRetrievalRequest {
artifact_id: request.artifact_id,
requester_agent_id: request.requester_agent_id,
session_id: request.session_id,
task_id: request.task_id,
repo_id: request.repo_id,
workspace_path_hash: request.workspace_path_hash.or(request.workspace),
max_bytes: request.max_bytes,
allow_stale: request.allow_stale,
reason: Some(request.reason),
};
ctx.storage
.with_connection(|conn| {
crate::storage::retrieve_context_artifact_raw(conn, storage_request)
})
.map(|retrieved| {
if require_redacted && !retrieved.artifact.redaction_status.allows_raw_storage() {
return json!({
"error": "artifact redaction status does not permit raw retrieval",
"redaction_status": retrieved.artifact.redaction_status.as_str()
});
}
let now = Utc::now();
let stale = retrieved.artifact.is_stale_at(now);
let expired = retrieved.artifact.is_expired_at(now);
let returned_bytes = retrieved.returned_bytes;
let original_bytes = retrieved.original_bytes;
let truncated = retrieved.truncated;
let artifact = retrieved.artifact;
let (encoding, content) = match String::from_utf8(retrieved.content) {
Ok(content) => ("utf8", content),
Err(err) => ("base64", BASE64.encode(err.into_bytes())),
};
json!({
"artifact": artifact,
"content": content,
"encoding": encoding,
"returned_bytes": returned_bytes,
"original_bytes": original_bytes,
"truncated": truncated,
"stale": stale,
"expired": expired
})
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn context_search(ctx: &HandlerContext, params: Value) -> Value {
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(query) if !query.trim().is_empty() => query,
_ => return json!({"error": "query is required"}),
};
let mut request: crate::context::ContextSearchRequest =
match serde_json::from_value(params.clone()) {
Ok(request) => request,
Err(e) => return json!({"error": e.to_string()}),
};
request.query = Some(query.to_string());
ctx.storage
.with_connection(|conn| crate::context::search_context(conn, &request))
.map(|response| json!(response))
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn context_build_bundle(ctx: &HandlerContext, params: Value) -> Value {
let request: crate::context::ContextBundleRequest = match serde_json::from_value(params) {
Ok(request) => request,
Err(e) => return json!({"error": e.to_string()}),
};
ctx.storage
.with_connection(|conn| crate::context::build_context_bundle(conn, &request))
.map(|bundle| json!(bundle))
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_extract_facts(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::fact_extraction::{
create_fact, ConversationProcessor, RuleBasedExtractor,
};
let memory_id = match params.get("memory_id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "memory_id is required"}),
};
ctx.storage
.with_connection(|conn| {
let content: Option<String> = conn
.query_row(
"SELECT content FROM memories WHERE id = ?1",
rusqlite::params![memory_id],
|row| row.get(0),
)
.ok();
let content = match content {
Some(c) => c,
None => {
return Ok(json!({"error": format!("memory {} not found", memory_id)}));
}
};
let processor = ConversationProcessor::new(Box::new(RuleBasedExtractor::new()));
let extracted = processor.process_text(&content, Some(memory_id));
let mut stored = Vec::new();
for fact in &extracted {
if let Ok(f) = create_fact(conn, fact, Some(memory_id)) {
stored.push(json!({
"id": f.id,
"subject": f.subject,
"predicate": f.predicate,
"object": f.object,
"confidence": f.confidence
}));
}
}
let facts_stored = stored.len();
let result = json!({
"memory_id": memory_id,
"facts_extracted": extracted.len(),
"facts_stored": facts_stored,
"facts": stored
});
if facts_stored > 0 {
let op_id = uuid::Uuid::new_v4().to_string();
emit_best_effort(
conn,
&EnrichmentEvent {
operation_id: &op_id,
event_type: "fact_ingest",
memory_id: Some(memory_id),
version_id: None,
triggered_by: "memory_extract_facts",
agent_id: None,
workspace: None,
params: json!({}),
outcome: json!({"facts_stored": facts_stored}),
status: "completed",
dry_run: false,
},
);
}
Ok(result)
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_list_facts(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::fact_extraction::list_facts;
let source_id = params.get("memory_id").and_then(|v| v.as_i64());
let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize;
ctx.storage
.with_connection(|conn| {
let facts = list_facts(conn, source_id, limit)?;
let items: Vec<Value> = facts
.iter()
.map(|f| {
json!({
"id": f.id,
"subject": f.subject,
"predicate": f.predicate,
"object": f.object,
"confidence": f.confidence,
"source_memory_id": f.source_memory_id,
"created_at": f.created_at
})
})
.collect();
Ok(json!({"facts": items, "count": items.len()}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_fact_graph(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::fact_extraction::get_fact_graph;
let subject = match params.get("subject").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return json!({"error": "subject is required"}),
};
ctx.storage
.with_connection(|conn| {
let facts = get_fact_graph(conn, &subject)?;
let items: Vec<Value> = facts
.iter()
.map(|f| {
json!({
"id": f.id,
"subject": f.subject,
"predicate": f.predicate,
"object": f.object,
"confidence": f.confidence,
"source_memory_id": f.source_memory_id
})
})
.collect();
Ok(json!({"subject": subject, "facts": items, "count": items.len()}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_build_context(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::context_builder::{
ContextBuilder, MemoryEntry, PromptTemplate, Section, SimpleTokenCounter, Strategy,
};
use crate::search::hybrid_search;
use crate::types::SearchOptions;
use chrono::{Duration, Utc};
use std::collections::HashSet;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q.to_string(),
None => return json!({"error": "query is required"}),
};
let total_budget = params
.get("total_budget")
.and_then(|v| v.as_u64())
.unwrap_or(4096) as usize;
let strategy = match params.get("strategy").and_then(|v| v.as_str()) {
Some("balanced") => Strategy::Balanced,
Some("recency") => Strategy::Recency,
_ => Strategy::Greedy,
};
let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as usize;
let depth = params
.get("depth")
.and_then(|v| v.as_u64())
.unwrap_or(1)
.clamp(1, 3) as usize;
let timeframe = params
.get("timeframe")
.and_then(|v| v.as_str())
.unwrap_or("all");
let include_types: Option<Vec<String>> = params
.get("include_types")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
});
let include_graph = params
.get("include_graph")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let time_cutoff = match timeframe {
"1h" => Some(Utc::now() - Duration::hours(1)),
"24h" => Some(Utc::now() - Duration::hours(24)),
"7d" => Some(Utc::now() - Duration::days(7)),
"30d" => Some(Utc::now() - Duration::days(30)),
_ => None, };
let search_opts = SearchOptions {
workspace: params
.get("workspace")
.and_then(|v| v.as_str())
.map(str::to_string),
limit: Some(limit as i64),
..Default::default()
};
let query_embedding = ctx.embedder.embed(&query).ok();
let embedding_ref = query_embedding.as_deref();
let search_result = ctx.storage.with_connection(|conn| {
hybrid_search(
conn,
&query,
embedding_ref,
&search_opts,
&ctx.search_config,
)
});
let mut memories = match search_result {
Ok(results) => results,
Err(e) => return json!({"error": e.to_string()}),
};
if let Some(cutoff) = time_cutoff {
memories.retain(|r| r.memory.created_at >= cutoff);
}
if let Some(ref types) = include_types {
memories.retain(|r| types.contains(&r.memory.memory_type.as_str().to_string()));
}
let mut all_memory_contents: Vec<(String, chrono::DateTime<Utc>)> = memories
.iter()
.map(|r| (r.memory.content.clone(), r.memory.created_at))
.collect();
let mut all_memory_ids: Vec<i64> = memories.iter().map(|r| r.memory.id).collect();
if depth > 1 {
let mut seen_ids: HashSet<i64> = all_memory_ids.iter().copied().collect();
let mut frontier: Vec<i64> = all_memory_ids.clone();
for _hop in 1..depth {
let mut next_frontier = Vec::new();
for id in &frontier {
if let Ok(related) = ctx.storage.with_connection(|conn| {
let mut stmt = conn.prepare(
"SELECT DISTINCT to_id FROM crossrefs WHERE from_id = ?1
UNION
SELECT DISTINCT from_id FROM crossrefs WHERE to_id = ?1",
)?;
let ids: Vec<i64> = stmt
.query_map(rusqlite::params![id], |row| row.get(0))?
.filter_map(|r| r.ok())
.collect();
Ok(ids)
}) {
for rid in related {
if seen_ids.insert(rid) {
next_frontier.push(rid);
}
}
}
}
if next_frontier.is_empty() {
break;
}
for id in &next_frontier {
if let Ok(mem) = ctx
.storage
.with_connection(|conn| crate::storage::queries::get_memory(conn, *id))
{
if let Some(cutoff) = time_cutoff {
if mem.created_at < cutoff {
continue;
}
}
if let Some(ref types) = include_types {
if !types.contains(&mem.memory_type.as_str().to_string()) {
continue;
}
}
all_memory_contents.push((mem.content.clone(), mem.created_at));
all_memory_ids.push(mem.id);
}
}
frontier = next_frontier;
}
}
let entries: Vec<MemoryEntry> = all_memory_contents
.iter()
.map(|(content, created_at)| MemoryEntry::new(content.clone(), *created_at))
.collect();
let template = PromptTemplate {
sections: vec![Section {
name: "Memories".to_string(),
content: String::new(),
max_tokens: total_budget,
priority: 0,
}],
total_budget,
separator: "\n\n---\n\n".to_string(),
};
let builder = ContextBuilder::new(Box::new(SimpleTokenCounter));
let prompt = builder.build(&template, &entries, strategy);
let token_estimate = builder.estimate_tokens(&prompt);
let graph = if include_graph {
ctx.storage
.with_connection(|conn| {
let mut edges = Vec::new();
for id in &all_memory_ids {
let mut stmt = conn.prepare(
"SELECT from_id, to_id, edge_type FROM crossrefs
WHERE from_id = ?1 OR to_id = ?1",
)?;
let rows: Vec<Value> = stmt
.query_map(rusqlite::params![id], |row| {
Ok(json!({
"source": row.get::<_, i64>(0)?,
"target": row.get::<_, i64>(1)?,
"relation": row.get::<_, String>(2)?
}))
})?
.filter_map(|r| r.ok())
.collect();
edges.extend(rows);
}
Ok(json!({"edges": edges, "node_count": all_memory_ids.len()}))
})
.unwrap_or_else(|_| json!({"edges": [], "node_count": 0}))
} else {
Value::Null
};
let mut response = json!({
"prompt": prompt,
"token_estimate": token_estimate,
"memories_used": entries.len(),
"total_budget": total_budget,
"depth": depth,
"timeframe": timeframe
});
if include_graph {
response
.as_object_mut()
.expect("response is an object")
.insert("graph".to_string(), graph);
}
response
}
pub fn memory_block_get(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::memory_blocks::get_block;
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "name is required"}),
};
ctx.storage
.with_connection(|conn| match get_block(conn, &name)? {
Some(block) => Ok(json!(block)),
None => Ok(json!({"error": format!("block '{}' not found", name)})),
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_block_edit(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::memory_blocks::update_block;
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "name is required"}),
};
let content = match params.get("content").and_then(|v| v.as_str()) {
Some(c) => c.to_string(),
None => return json!({"error": "content is required"}),
};
let reason = params
.get("reason")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
ctx.storage
.with_connection(|conn| {
let block = update_block(conn, &name, &content, &reason)?;
Ok(json!(block))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_block_list(ctx: &HandlerContext, _params: Value) -> Value {
use crate::storage::memory_blocks::list_blocks;
ctx.storage
.with_connection(|conn| {
let blocks = list_blocks(conn)?;
Ok(json!({"blocks": blocks, "count": blocks.len()}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_block_create(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::memory_blocks::create_block;
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "name is required"}),
};
let content = params
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let max_tokens = params
.get("max_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(4096) as usize;
ctx.storage
.with_connection(|conn| {
let block = create_block(conn, &name, &content, max_tokens)?;
Ok(json!(block))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_block_archive(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::memory_blocks::{delete_block, get_block};
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "name is required"}),
};
ctx.storage
.with_connection(|conn| {
let block = get_block(conn, &name)?;
let final_content = block
.as_ref()
.map(|b| b.content.clone())
.unwrap_or_default();
let final_version = block.as_ref().map(|b| b.version).unwrap_or(0);
if block.is_none() {
return Ok(json!({"error": format!("block '{}' not found", name)}));
}
delete_block(conn, &name)?;
Ok(json!({
"success": true,
"name": name,
"final_content": final_content,
"final_version": final_version
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_get_injection_prompt(ctx: &HandlerContext, params: Value) -> Value {
use crate::search::hybrid_search;
use crate::types::SearchOptions;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q.to_string(),
None => return json!({"error": "query is required"}),
};
let token_budget = params
.get("token_budget")
.and_then(|v| v.as_u64())
.unwrap_or(2000) as usize;
let include_types: Vec<String> = params
.get("include_types")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
let search_opts = SearchOptions {
workspace: params
.get("workspace")
.and_then(|v| v.as_str())
.map(str::to_string),
limit: Some(20),
..Default::default()
};
let query_embedding = ctx.embedder.embed(&query).ok();
let embedding_ref = query_embedding.as_deref();
let search_result = ctx.storage.with_connection(|conn| {
hybrid_search(
conn,
&query,
embedding_ref,
&search_opts,
&ctx.search_config,
)
});
let memories = match search_result {
Ok(results) => results,
Err(e) => return json!({"error": e.to_string()}),
};
let memories: Vec<_> = if include_types.is_empty() {
memories
} else {
memories
.into_iter()
.filter(|r| include_types.contains(&r.memory.memory_type.as_str().to_string()))
.collect()
};
if memories.is_empty() {
return json!({
"prompt": "# Relevant Context\n\n*(No memories found)*",
"memory_count": 0,
"tokens_used": 0
});
}
let blocks: Vec<String> = memories
.iter()
.map(|r| {
let m = &r.memory;
let tags_str = m.tags.join(", ");
format!(
"## [{}] Memory #{}\nCreated: {} | Tags: {}\n\n{}\n\n---",
m.memory_type.as_str(),
m.id,
m.created_at.to_rfc3339(),
tags_str,
m.content
)
})
.collect();
let header = "# Relevant Context\n\n";
let joined = blocks.join("\n\n");
let full_prompt = format!("{}{}", header, joined);
let total_chars = full_prompt.len();
let estimated_tokens = total_chars / 4;
if estimated_tokens <= token_budget {
return json!({
"prompt": full_prompt,
"memory_count": memories.len(),
"tokens_used": estimated_tokens
});
}
let count = memories.len();
let budget_chars = token_budget * 4;
let header_chars = header.len();
let separator_chars = "\n\n".len() * (count.saturating_sub(1));
let overhead_per_block = 80usize;
let total_overhead = header_chars + separator_chars + overhead_per_block * count;
let available_content_chars = budget_chars.saturating_sub(total_overhead);
let chars_per_content = available_content_chars.checked_div(count).unwrap_or(0);
let truncated_blocks: Vec<String> = memories
.iter()
.map(|r| {
let m = &r.memory;
let tags_str = m.tags.join(", ");
let content = if m.content.len() > chars_per_content && chars_per_content > 0 {
format!("{}β¦", safe_truncate(&m.content, chars_per_content))
} else {
m.content.clone()
};
format!(
"## [{}] Memory #{}\nCreated: {} | Tags: {}\n\n{}\n\n---",
m.memory_type.as_str(),
m.id,
m.created_at.to_rfc3339(),
tags_str,
content
)
})
.collect();
let final_prompt = format!("{}{}", header, truncated_blocks.join("\n\n"));
let tokens_used = final_prompt.len() / 4;
json!({
"prompt": final_prompt,
"memory_count": count,
"tokens_used": tokens_used
})
}
pub fn memory_observe_tool_use(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::create_memory;
use crate::types::{CreateMemoryInput, MemoryType};
let tool_name = match params.get("tool_name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "tool_name is required"}),
};
let tool_input = match params.get("tool_input") {
Some(v) => v.clone(),
None => return json!({"error": "tool_input is required"}),
};
let tool_output = match params.get("tool_output").and_then(|v| v.as_str()) {
Some(o) => o.to_string(),
None => return json!({"error": "tool_output is required"}),
};
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let compress = params
.get("compress")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let content = if compress {
let input_str = serde_json::to_string(&tool_input).unwrap_or_default();
let input_preview = if input_str.len() > 200 {
format!("{}β¦", safe_truncate(&input_str, 200))
} else {
input_str
};
let output_preview = if tool_output.len() > 200 {
format!("{}β¦", safe_truncate(&tool_output, 200))
} else {
tool_output.clone()
};
format!(
"[{}] inputβ{} outputβ{}",
tool_name, input_preview, output_preview
)
} else {
serde_json::to_string(&json!({
"tool_name": tool_name,
"input": tool_input,
"output": tool_output
}))
.unwrap_or_else(|_| format!("[{}] observation", tool_name))
};
let tags = vec![
"tool-observation".to_string(),
format!("session:{}", session_id),
tool_name.clone(),
];
let input = CreateMemoryInput {
content,
memory_type: MemoryType::Episodic,
tags,
workspace: Some("default".to_string()),
..Default::default()
};
let result = ctx
.storage
.with_transaction(|conn| create_memory(conn, &input));
match result {
Ok(memory) => json!({
"id": memory.id,
"compressed": compress
}),
Err(e) => json!({"error": e.to_string()}),
}
}
pub fn memory_archive_tool_output(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::create_memory;
use crate::types::{CreateMemoryInput, MemoryType};
let tool_name = match params.get("tool_name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "tool_name is required"}),
};
let raw_output = match params.get("raw_output").and_then(|v| v.as_str()) {
Some(o) => o.to_string(),
None => return json!({"error": "raw_output is required"}),
};
let session_id = params
.get("session_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let compress_summary = params
.get("compress_summary")
.and_then(|v| v.as_bool())
.unwrap_or(true);
let summary_tokens = params
.get("summary_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(500) as usize;
let tags = vec![
"tool-archive".to_string(),
format!("session:{}", session_id),
tool_name.clone(),
];
let input = CreateMemoryInput {
content: raw_output.clone(),
memory_type: MemoryType::Episodic,
tags,
workspace: Some("archive".to_string()),
..Default::default()
};
let archive_memory = match ctx
.storage
.with_transaction(|conn| create_memory(conn, &input))
{
Ok(m) => m,
Err(e) => return json!({"error": e.to_string()}),
};
let archive_id = archive_memory.id;
let summary = if compress_summary {
let max_chars = summary_tokens * 4;
let slice = safe_truncate(&raw_output, max_chars);
let boundary = slice
.rfind(['.', '!', '?', '\n'])
.map(|pos| pos + 1)
.unwrap_or(slice.len());
let trimmed = slice[..boundary].trim_end();
format!("[{} summary] {}", tool_name, trimmed)
} else {
raw_output.clone()
};
let raw_tokens_estimate = raw_output.len() / 4;
let summary_tokens_estimate = summary.len() / 4;
let compression_ratio = summary_tokens_estimate as f64 / (raw_tokens_estimate.max(1)) as f64;
json!({
"archive_id": archive_id,
"summary": summary,
"raw_tokens_estimate": raw_tokens_estimate,
"summary_tokens_estimate": summary_tokens_estimate,
"compression_ratio": compression_ratio
})
}
pub fn memory_get_archived_output(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::get_memory;
let archive_id = match params.get("archive_id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "archive_id is required"}),
};
let memory = match ctx
.storage
.with_connection(|conn| match get_memory(conn, archive_id) {
Ok(m) => Ok(Some(m)),
Err(crate::error::EngramError::NotFound(_)) => Ok(None),
Err(e) => Err(e),
}) {
Ok(Some(m)) => m,
Ok(None) => return json!({"error": "Archive not found", "archive_id": archive_id}),
Err(e) => return json!({"error": e.to_string()}),
};
let tool_name = memory
.tags
.iter()
.find(|t| *t != "tool-archive" && !t.starts_with("session:"))
.cloned()
.unwrap_or_else(|| "unknown".to_string());
json!({
"archive_id": archive_id,
"tool_name": tool_name,
"content": memory.content,
"created_at": memory.created_at.to_rfc3339()
})
}
pub fn memory_get_working_memory(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::list_memories;
use crate::types::{ListOptions, SortField, SortOrder};
use chrono::{Duration, Utc};
let session_id = match params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => return json!({"error": "session_id is required"}),
};
let token_budget = params
.get("token_budget")
.and_then(|v| v.as_u64())
.unwrap_or(4000) as usize;
let include_tool_names: Vec<String> = params
.get("include_tool_names")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
let since_cutoff = params
.get("since_minutes")
.and_then(|v| v.as_u64())
.map(|mins| Utc::now() - Duration::minutes(mins as i64));
let session_tag = format!("session:{}", session_id);
let has_session_tag = |tags: &[String]| tags.contains(&session_tag);
let extract_tool_name = |tags: &[String], exclude_prefix: &str| -> String {
tags.iter()
.find(|t| *t != exclude_prefix && !t.starts_with("session:"))
.cloned()
.unwrap_or_else(|| "unknown".to_string())
};
let passes_tool_filter = |tags: &[String]| -> bool {
if include_tool_names.is_empty() {
return true;
}
tags.iter().any(|t| include_tool_names.contains(t))
};
let obs_options = ListOptions {
workspace: Some("default".to_string()),
tags: Some(vec!["tool-observation".to_string()]),
sort_by: Some(SortField::CreatedAt),
sort_order: Some(SortOrder::Asc),
limit: Some(1000),
..Default::default()
};
let all_observations = match ctx
.storage
.with_connection(|conn| list_memories(conn, &obs_options))
{
Ok(mems) => mems,
Err(e) => return json!({"error": e.to_string()}),
};
let observations: Vec<_> = all_observations
.into_iter()
.filter(|m| {
has_session_tag(&m.tags)
&& passes_tool_filter(&m.tags)
&& since_cutoff.is_none_or(|cutoff| m.created_at >= cutoff)
})
.collect();
let archive_options = ListOptions {
workspace: Some("archive".to_string()),
tags: Some(vec!["tool-archive".to_string()]),
sort_by: Some(SortField::CreatedAt),
sort_order: Some(SortOrder::Asc),
limit: Some(1000),
..Default::default()
};
let all_archives = match ctx
.storage
.with_connection(|conn| list_memories(conn, &archive_options))
{
Ok(mems) => mems,
Err(e) => return json!({"error": e.to_string()}),
};
let archives: Vec<_> = all_archives
.into_iter()
.filter(|m| {
has_session_tag(&m.tags)
&& passes_tool_filter(&m.tags)
&& since_cutoff.is_none_or(|cutoff| m.created_at >= cutoff)
})
.collect();
let archive_refs: Vec<Value> = archives
.iter()
.map(|m| {
let tool_name = extract_tool_name(&m.tags, "tool-archive");
json!({"id": m.id, "tool_name": tool_name})
})
.collect();
let archive_section: String = archives
.iter()
.map(|m| {
let tn = extract_tool_name(&m.tags, "tool-archive");
format!(
"**Archive ref:** [{}] ID={} β call `memory_get_archived_output` with archive_id={} to retrieve full output\n",
tn, m.id, m.id
)
})
.collect();
let archive_reserved = archive_section.len() / 4;
let obs_count = observations.len();
let content_budget_chars = (token_budget.saturating_sub(500 + archive_reserved)) * 4;
let chars_per_obs = content_budget_chars
.checked_div(obs_count)
.unwrap_or(content_budget_chars);
let mut md = format!(
"# Working Memory β Session {}\n\n## Tool Observations ({} total)\n\n",
session_id, obs_count
);
for (i, m) in observations.iter().enumerate() {
let tool_name = extract_tool_name(&m.tags, "tool-observation");
let content = if m.content.len() > chars_per_obs && chars_per_obs > 0 {
format!("{}β¦", safe_truncate(&m.content, chars_per_obs))
} else {
m.content.clone()
};
md.push_str(&format!(
"### {} (observation #{})\n{}\n\n---\n",
tool_name,
i + 1,
content
));
}
md.push_str(&archive_section);
let tokens_estimate = md.len() / 4;
json!({
"working_memory": md,
"observation_count": obs_count,
"archive_count": archives.len(),
"archive_refs": archive_refs,
"tokens_estimate": tokens_estimate
})
}
pub fn memory_block_history(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::memory_blocks::get_block_history;
let name = match params.get("name").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => return json!({"error": "name is required"}),
};
let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(20) as usize;
ctx.storage
.with_connection(|conn| {
let history = get_block_history(conn, &name, limit)?;
Ok(json!({"name": name, "history": history, "count": history.len()}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_prepare_context(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::integration_orchestrator::IntegrationOrchestrator;
use crate::search::hybrid_search;
use crate::types::SearchOptions;
let query = match params.get("query").and_then(|v| v.as_str()) {
Some(q) => q.to_string(),
None => return json!({"error": "query is required"}),
};
let budget = params
.get("budget")
.and_then(|v| v.as_u64())
.unwrap_or(4000) as usize;
let search_opts = SearchOptions {
workspace: params
.get("workspace")
.and_then(|v| v.as_str())
.map(str::to_string),
limit: Some(50),
..Default::default()
};
let query_embedding = ctx.embedder.embed(&query).ok();
let embedding_ref = query_embedding.as_deref();
let search_result = ctx.storage.with_connection(|conn| {
hybrid_search(
conn,
&query,
embedding_ref,
&search_opts,
&ctx.search_config,
)
});
let memories: Vec<_> = match search_result {
Ok(results) => results.into_iter().map(|r| r.memory).collect(),
Err(e) => return json!({"error": e.to_string()}),
};
let orchestrator = IntegrationOrchestrator::new();
match orchestrator.prepare_context_for_llm(&query, &memories, budget) {
Ok(prepared) => json!({
"context": prepared.context,
"token_count": prepared.token_count,
"groups_count": prepared.groups_count,
"memory_count": memories.len(),
}),
Err(e) => json!({"error": e.to_string()}),
}
}
#[cfg(test)]
mod context_tests {
use super::safe_truncate;
fn test_ctx() -> super::super::HandlerContext {
use crate::embedding::{create_embedder, EmbeddingCache};
use crate::search::{AdaptiveCacheConfig, FuzzyEngine, SearchConfig, SearchResultCache};
use crate::storage::Storage;
use crate::types::EmbeddingConfig;
use parking_lot::Mutex;
use std::sync::Arc;
let storage = Storage::open_in_memory().expect("in-memory storage");
let embedder = create_embedder(&EmbeddingConfig::default()).expect("tfidf embedder");
super::super::HandlerContext {
storage,
embedder,
fuzzy_engine: Arc::new(Mutex::new(FuzzyEngine::new())),
search_config: SearchConfig::default(),
realtime: None,
embedding_cache: Arc::new(EmbeddingCache::default()),
search_cache: Arc::new(SearchResultCache::new(AdaptiveCacheConfig::default())),
#[cfg(feature = "meilisearch")]
meili: None,
#[cfg(feature = "meilisearch")]
meili_indexer: None,
#[cfg(feature = "meilisearch")]
meili_sync_interval: 60,
#[cfg(feature = "langfuse")]
langfuse_runtime: Arc::new(tokio::runtime::Runtime::new().expect("langfuse runtime")),
}
}
fn seed_memory(ctx: &super::super::HandlerContext, content: &str, mem_type: &str) -> i64 {
let result = super::super::dispatch(
ctx,
"memory_create",
serde_json::json!({
"content": content,
"memory_type": mem_type,
"workspace": "default"
}),
);
result["id"].as_i64().expect("memory must be created")
}
fn link_memories(ctx: &super::super::HandlerContext, from: i64, to: i64) {
super::super::dispatch(
ctx,
"memory_link",
serde_json::json!({
"from_id": from,
"to_id": to,
"edge_type": "related_to",
"score": 0.9
}),
);
}
#[test]
fn test_build_context_backward_compat() {
let ctx = test_ctx();
seed_memory(&ctx, "Rust is a systems programming language", "note");
let result =
super::memory_build_context(&ctx, serde_json::json!({"query": "Rust programming"}));
assert!(result.get("prompt").is_some(), "must have prompt");
assert!(
result.get("token_estimate").is_some(),
"must have token_estimate"
);
assert!(
result.get("memories_used").is_some(),
"must have memories_used"
);
assert!(
result.get("total_budget").is_some(),
"must have total_budget"
);
assert_eq!(result["depth"], 1, "default depth must be 1");
assert_eq!(
result["timeframe"], "all",
"default timeframe must be 'all'"
);
assert!(
result.get("graph").is_none(),
"graph should not be present by default"
);
}
#[test]
fn test_build_context_include_types_filters() {
let ctx = test_ctx();
seed_memory(&ctx, "Important decision about architecture", "decision");
seed_memory(&ctx, "Remember to buy groceries", "note");
let result = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "architecture decision groceries",
"include_types": ["decision"]
}),
);
let used = result["memories_used"].as_u64().unwrap_or(0);
assert!(
used <= 1,
"should only include decision-type memories, got {}",
used
);
}
#[test]
fn test_build_context_timeframe_filters() {
let ctx = test_ctx();
seed_memory(&ctx, "Recent memory about testing", "note");
let result = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "testing",
"timeframe": "1h"
}),
);
assert_eq!(result["timeframe"], "1h");
let used = result["memories_used"].as_u64().unwrap_or(0);
assert!(
used >= 1,
"recently created memory should be found with 1h timeframe"
);
}
#[test]
fn test_build_context_depth_expansion() {
let ctx = test_ctx();
let id1 = seed_memory(&ctx, "Core concept about neural networks", "note");
let id2 = seed_memory(&ctx, "Backpropagation algorithm details", "note");
let _id3 = seed_memory(&ctx, "Gradient descent optimization", "note");
link_memories(&ctx, id1, id2);
link_memories(&ctx, id2, _id3);
let result = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "neural networks",
"depth": 2
}),
);
assert_eq!(result["depth"], 2);
let deep_used = result["memories_used"].as_u64().unwrap_or(0);
let result_shallow = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "neural networks",
"depth": 1
}),
);
let shallow_used = result_shallow["memories_used"].as_u64().unwrap_or(0);
assert!(
deep_used >= shallow_used,
"depth=2 ({}) should find >= depth=1 ({})",
deep_used,
shallow_used
);
}
#[test]
fn test_build_context_include_graph() {
let ctx = test_ctx();
let id1 = seed_memory(&ctx, "Graph data structures", "note");
let id2 = seed_memory(&ctx, "Adjacency list representation", "note");
link_memories(&ctx, id1, id2);
let result = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "graph data structures",
"include_graph": true
}),
);
assert!(
result.get("graph").is_some(),
"graph must be present when include_graph=true"
);
let graph = &result["graph"];
assert!(graph.get("edges").is_some(), "graph must have edges");
assert!(
graph.get("node_count").is_some(),
"graph must have node_count"
);
}
#[test]
fn test_build_context_depth_clamped_to_max() {
let ctx = test_ctx();
seed_memory(&ctx, "Testing depth clamping", "note");
let result = super::memory_build_context(
&ctx,
serde_json::json!({
"query": "depth clamping",
"depth": 10
}),
);
assert_eq!(result["depth"], 3, "depth should be clamped to max 3");
}
#[test]
fn test_safe_truncate_ascii() {
assert_eq!(safe_truncate("hello world", 5), "hello");
}
#[test]
fn test_safe_truncate_within_limit() {
assert_eq!(safe_truncate("hi", 100), "hi");
}
#[test]
fn test_safe_truncate_empty() {
assert_eq!(safe_truncate("", 10), "");
}
#[test]
fn test_safe_truncate_multibyte_emoji() {
let s = "πhello";
let result = safe_truncate(s, 5);
assert!(
s.is_char_boundary(result.len()),
"result must end on char boundary"
);
assert!(
!result.contains('\u{FFFD}'),
"must not produce replacement chars"
);
}
#[test]
fn test_safe_truncate_multibyte_cjk() {
let s = "ζ₯ζ¬θͺ";
let result = safe_truncate(s, 4);
assert!(s.is_char_boundary(result.len()));
assert!(result == "ζ₯" || result.is_empty());
}
#[test]
fn test_safe_truncate_exact_boundary() {
let s = "abcdef";
assert_eq!(safe_truncate(s, 3), "abc");
}
#[test]
fn test_safe_truncate_zero() {
assert_eq!(safe_truncate("hello", 0), "");
}
}