use serde_json::{json, Value};
use super::HandlerContext;
use crate::storage::enrichment_events::{emit_best_effort, EnrichmentEvent};
pub fn memory_compress(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::compression_semantic::{CompressionConfig, SemanticCompressor};
use crate::storage::queries::get_memory;
let id = match params.get("id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "id is required"}),
};
let target_ratio: f32 = params
.get("target_ratio")
.and_then(|v| v.as_f64())
.map(|v| v as f32)
.unwrap_or(0.1);
ctx.storage
.with_connection(|conn| {
let memory = get_memory(conn, id)?;
let config = CompressionConfig {
target_ratio,
..CompressionConfig::default()
};
let compressor = SemanticCompressor::new(config);
let result = compressor.compress(&memory.content);
Ok(json!({
"memory_id": id,
"original_tokens": result.original_tokens,
"compressed_tokens": result.compressed_tokens,
"compression_ratio": result.ratio,
"structured_content": result.structured_content,
"key_entities": result.key_entities,
"key_facts": result.key_facts,
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_decompress(ctx: &HandlerContext, params: Value) -> Value {
use crate::storage::queries::get_memory;
let id = match params.get("id").and_then(|v| v.as_i64()) {
Some(id) => id,
None => return json!({"error": "id is required"}),
};
ctx.storage
.with_connection(|conn| {
let memory = get_memory(conn, id)?;
Ok(json!({
"memory_id": id,
"content": memory.content,
"status": "ok",
}))
})
.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_compress_for_context(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::context_compression::{ContextCompressor, MemoryInput};
use crate::storage::queries::get_memory;
let ids: Vec<i64> = params
.get("ids")
.or_else(|| params.get("memory_ids"))
.and_then(|v| v.as_array())
.map(|arr| arr.iter().filter_map(|v| v.as_i64()).collect())
.unwrap_or_default();
if ids.is_empty() {
return json!({"error": "ids array is required and must not be empty"});
}
let token_budget: usize = params
.get("token_budget")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(4096);
let result = ctx.storage.with_connection(|conn| {
let mut inputs: Vec<MemoryInput> = Vec::new();
for &id in &ids {
if let Ok(m) = get_memory(conn, id) {
inputs.push(MemoryInput {
id: m.id,
content: m.content,
importance: m.importance,
});
}
}
let mut compressor = ContextCompressor::new(token_budget);
let compression = compressor.compress_for_context_with_diagnostics(&inputs);
let total_tokens: usize = compression.entries.iter().map(|e| e.tokens_used).sum();
Ok(json!({
"token_budget": token_budget,
"memories_input": inputs.len(),
"memories_included": compression.entries.len(),
"memories_skipped": compression.skipped_ids.len(),
"total_tokens": total_tokens,
"budget_used": compression.budget.used,
"budget_remaining": compression.budget.remaining,
"skipped_memory_ids": compression.skipped_ids,
"entries": compression.entries,
}))
});
result.unwrap_or_else(|e| json!({"error": e.to_string()}))
}
pub fn memory_consolidate(ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::consolidation_offline::{
ConsolidationConfig, GroupingStrategy, OfflineConsolidator,
};
let workspace = params
.get("workspace")
.and_then(|v| v.as_str())
.unwrap_or("default");
let strategy_str = params
.get("strategy")
.and_then(|v| v.as_str())
.unwrap_or("content_overlap");
let dry_run = params
.get("dry_run")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let strategy = match strategy_str {
"tag_similarity" => GroupingStrategy::TagSimilarity,
"temporal_proximity" => GroupingStrategy::TemporalProximity,
_ => GroupingStrategy::ContentOverlap,
};
let result = ctx.storage.with_connection(|conn| {
let config = ConsolidationConfig::default();
let consolidator = OfflineConsolidator::new(config);
let report = consolidator.consolidate_with_strategy(conn, workspace, strategy)?;
if !dry_run && report.memories_merged > 0 {
let operation_id = uuid::Uuid::new_v4().to_string();
emit_best_effort(
conn,
&EnrichmentEvent {
operation_id: &operation_id,
event_type: "consolidation",
memory_id: None,
version_id: None,
triggered_by: "memory_consolidate",
agent_id: None,
workspace: Some(workspace),
params: json!({"strategy": strategy_str}),
outcome: json!({
"groups_found": report.groups_found,
"memories_merged": report.memories_merged,
"memories_archived": report.memories_archived,
"tokens_saved": report.tokens_saved,
}),
status: "completed",
dry_run,
},
);
}
Ok(json!({
"workspace": workspace,
"strategy": strategy_str,
"dry_run": dry_run,
"groups_found": report.groups_found,
"memories_merged": report.memories_merged,
"memories_archived": report.memories_archived,
"tokens_before": report.tokens_before,
"tokens_after": report.tokens_after,
"tokens_saved": report.tokens_saved,
}))
});
result.unwrap_or_else(|e| {
let err_str = e.to_string();
let _ = ctx.storage.with_connection(|conn| {
let op_id = uuid::Uuid::new_v4().to_string();
emit_best_effort(
conn,
&EnrichmentEvent {
operation_id: &op_id,
event_type: "consolidation",
memory_id: None,
version_id: None,
triggered_by: "memory_consolidate",
agent_id: None,
workspace: Some(workspace),
params: json!({"strategy": strategy_str}),
outcome: json!({"error": &err_str}),
status: "failed",
dry_run,
},
);
Ok::<_, crate::error::EngramError>(())
});
json!({"error": err_str})
})
}
pub fn memory_synthesis(_ctx: &HandlerContext, params: Value) -> Value {
use crate::intelligence::synthesis::{SynthesisConfig, SynthesisEngine, SynthesisStrategy};
let content_a = match params.get("content_a").and_then(|v| v.as_str()) {
Some(c) => c.to_string(),
None => return json!({"error": "content_a is required"}),
};
let content_b = match params.get("content_b").and_then(|v| v.as_str()) {
Some(c) => c.to_string(),
None => return json!({"error": "content_b is required"}),
};
let id_a: i64 = params.get("id_a").and_then(|v| v.as_i64()).unwrap_or(0);
let strategy_str = params
.get("strategy")
.and_then(|v| v.as_str())
.unwrap_or("merge");
let strategy = match strategy_str {
"replace" => SynthesisStrategy::Replace,
"append" => SynthesisStrategy::Append,
_ => SynthesisStrategy::Merge,
};
let mut engine = SynthesisEngine::new(SynthesisConfig::default());
engine.add_to_buffer(id_a, &content_a);
match engine.check_and_synthesize(&content_b, strategy) {
Some(synth) => json!({
"overlap_detected": true,
"overlap_score": synth.overlap_score,
"strategy_used": format!("{:?}", synth.strategy_used),
"synthesized_content": synth.content,
"source_ids": synth.sources,
"tokens_saved": synth.tokens_saved,
}),
None => json!({
"overlap_detected": false,
"message": "No significant overlap detected between the two contents",
}),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embedding::{create_embedder, EmbeddingCache};
use crate::search::{AdaptiveCacheConfig, FuzzyEngine, SearchConfig, SearchResultCache};
use crate::storage::Storage;
use crate::types::EmbeddingConfig;
use parking_lot::Mutex;
use serde_json::json;
use std::sync::Arc;
fn make_ctx() -> HandlerContext {
let storage = Storage::open_in_memory().expect("test storage");
HandlerContext {
storage,
embedder: create_embedder(&EmbeddingConfig::default()).expect("embedder"),
fuzzy_engine: Arc::new(Mutex::new(FuzzyEngine::new())),
search_config: SearchConfig::default(),
realtime: None,
embedding_cache: Arc::new(EmbeddingCache::default()),
search_cache: Arc::new(SearchResultCache::new(AdaptiveCacheConfig::default())),
#[cfg(feature = "meilisearch")]
meili: None,
#[cfg(feature = "meilisearch")]
meili_indexer: None,
#[cfg(feature = "meilisearch")]
meili_sync_interval: 60,
#[cfg(feature = "langfuse")]
langfuse_runtime: Arc::new(tokio::runtime::Runtime::new().expect("langfuse runtime")),
}
}
#[test]
fn test_memory_consolidate_emits_completed_event_on_success() {
let ctx = make_ctx();
ctx.storage
.with_transaction(|conn| {
conn.execute(
"INSERT INTO memories (content, memory_type, workspace, importance, access_count)
VALUES ('Rust ownership rules explained', 'general', 'default', 0.5, 0)",
[],
)?;
conn.execute(
"INSERT INTO memories (content, memory_type, workspace, importance, access_count)
VALUES ('Rust ownership rules overview', 'general', 'default', 0.5, 0)",
[],
)?;
Ok(())
})
.expect("seed memories");
let result = memory_consolidate(
&ctx,
json!({"workspace": "default", "strategy": "content_overlap", "dry_run": false}),
);
assert!(
result.get("error").is_none(),
"memory_consolidate error: {result}"
);
assert!(result["workspace"].as_str().is_some());
}
#[test]
fn test_memory_consolidate_emits_failed_event_on_error() {
let ctx = make_ctx();
ctx.storage
.with_connection(|conn| {
conn.execute_batch("DROP TABLE IF EXISTS memories;")?;
Ok(())
})
.expect("drop memories table");
let result = memory_consolidate(
&ctx,
json!({"workspace": "default", "strategy": "content_overlap", "dry_run": false}),
);
assert!(
result.get("error").is_some(),
"expected an error response after dropping memories table, got: {result}"
);
let count: i32 = ctx
.storage
.with_connection(|conn| {
Ok(conn.query_row(
"SELECT COUNT(*) FROM enrichment_events
WHERE triggered_by='memory_consolidate' AND status='failed'",
[],
|r| r.get(0),
)?)
})
.unwrap();
assert!(
count > 0,
"memory_consolidate error must emit status=failed enrichment event"
);
}
}