use chrono::{DateTime, Utc};
use dirs;
use serde_json;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use uuid::Uuid;
use crate::config::{ChromaConfig, GroqLlmConfig, JinaEmbedderConfig};
use crate::embeddings::jina::JinaEmbedding;
use crate::engine::QueryBuilder;
use crate::engine::StorageBuilder;
use crate::llm::groq::GroqLlm;
use crate::storage::SqliteManager;
use crate::vector_store::chroma::ChromaDB;
use crate::core::{
ChangeLog, ChangeType, FactId, FactOperation, IngestionOutcome, ModificationResult,
RetrievalResult, ScopeFilter, ScopeIdentifiers, StoredFact,
};
use crate::engine::config::{EngineConfig, ExtractionConfig, GraphConfig};
use crate::error::Result;
use crate::extraction::{LlmFactConsolidator, LlmFactExtractor};
use crate::search::semantic::SemanticSearch;
use crate::search::{MemoryKind, MemorySearch, SearchRequest};
#[derive(Clone)]
pub struct NeomemxEngine {
config: Arc<EngineConfig>,
searcher: Arc<dyn MemorySearch>,
}
impl NeomemxEngine {
pub async fn new() -> Result<Self> {
Self::with_config(EngineConfig::default().await?).await
}
pub async fn with_config(config: EngineConfig) -> Result<Self> {
let config = Arc::new(config);
let searcher: Arc<dyn MemorySearch> = Arc::new(SemanticSearch::new(
config.embedding_provider.clone(),
config.vector_backend.clone(),
config.reranker.clone(),
));
Ok(Self { config, searcher })
}
pub fn builder() -> crate::engine::EngineBuilder {
crate::engine::EngineBuilder::new()
}
pub fn store(&self, content: impl Into<String>) -> StorageBuilder {
StorageBuilder::new(self.clone(), content)
}
pub fn search(&self, query: impl Into<String>) -> QueryBuilder {
QueryBuilder::new(self.clone()).query(query)
}
pub fn retrieve_all(&self) -> QueryBuilder {
QueryBuilder::new(self.clone())
}
pub async fn update(
&self,
fact_id: &FactId,
new_content: impl Into<String>,
scope: ScopeIdentifiers,
) -> Result<ModificationResult> {
scope.validate()?;
let existing = self
.config
.vector_backend
.get(fact_id)
.await?
.ok_or_else(|| crate::error::NeomemxError::MemoryNotFound(fact_id.clone()))?;
let existing_fact = self
.output_to_fact(&existing)
.await?
.ok_or_else(|| crate::error::NeomemxError::MemoryNotFound(fact_id.clone()))?;
if existing_fact.scope != scope {
return Err(crate::error::NeomemxError::validation(
"Fact scope does not match",
crate::error::ErrorCode::ValidationRequired,
None,
None,
));
}
let new_content = new_content.into();
let previous_content = existing_fact.content.clone();
let embedding = self.config.embedding_provider.embed(&new_content).await?;
let mut payload = self.fact_to_payload(&existing_fact);
payload.insert("data".to_string(), serde_json::json!(&new_content));
payload.insert(
"updated_at".to_string(),
serde_json::json!(Utc::now().to_rfc3339()),
);
self.config
.vector_backend
.update(fact_id, Some(embedding), Some(payload))
.await?;
let change = ChangeLog::new(fact_id, ChangeType::Updated, scope)
.with_previous_content(&previous_content)
.with_new_content(&new_content);
self.config.history_store.record_change(change).await?;
Ok(ModificationResult::new(
fact_id,
previous_content,
new_content,
))
}
pub async fn delete(&self, fact_id: &FactId, scope: ScopeIdentifiers) -> Result<()> {
scope.validate()?;
if let Some(existing) = self.config.vector_backend.get(fact_id).await? {
if let Some(fact) = self.output_to_fact(&existing).await? {
if fact.scope != scope {
return Err(crate::error::NeomemxError::validation(
"Fact scope does not match",
crate::error::ErrorCode::ValidationRequired,
None,
None,
));
}
}
}
self.config.vector_backend.delete(fact_id).await?;
if let Some(ref graph) = self.config.graph_backend {
graph.delete_fact_graph(fact_id).await?;
}
let change = ChangeLog::new(fact_id, ChangeType::Deleted, scope);
self.config.history_store.record_change(change).await?;
Ok(())
}
pub async fn clear(&self, scope: ScopeIdentifiers) -> Result<usize> {
scope.validate()?;
let filter = ScopeFilter::from_scope(&scope);
let filters = filter.to_map();
let outputs = self
.config
.vector_backend
.list(Some(filters), 10000)
.await?;
let ids: Vec<String> = outputs.into_iter().map(|o| o.id).collect();
if !ids.is_empty() {
self.config.vector_backend.delete_batch(&ids).await?;
}
let count = ids.len();
Ok(count)
}
pub async fn history(&self, fact_id: &FactId) -> Result<Vec<ChangeLog>> {
self.config.history_store.get_history(fact_id).await
}
pub async fn get(&self, fact_id: &FactId) -> Result<Option<StoredFact>> {
if let Some(output) = self.config.vector_backend.get(fact_id).await? {
self.output_to_fact(&output).await
} else {
Ok(None)
}
}
pub async fn get_related(
&self,
fact_id: &FactId,
scope: ScopeIdentifiers,
max_depth: usize,
) -> Result<Vec<StoredFact>> {
scope.validate()?;
if let Some(ref graph) = self.config.graph_backend {
let filter = ScopeFilter::from_scope(&scope);
graph
.get_related_facts(fact_id, max_depth, Some(&filter))
.await
} else {
Ok(Vec::new())
}
}
pub(crate) async fn store_internal(
&self,
content: &str,
scope: ScopeIdentifiers,
metadata: Option<HashMap<String, serde_json::Value>>,
enable_extraction: bool,
enable_graph: bool,
) -> Result<IngestionOutcome> {
let mut outcome = IngestionOutcome::new();
if enable_extraction && self.config.extraction_config.enabled {
let extracted = self
.config
.fact_extractor
.extract(
content,
self.config.extraction_config.extraction_prompt.as_deref(),
)
.await?;
if extracted.is_empty() {
return Ok(outcome);
}
let scope_filter = ScopeFilter::from_scope(&scope);
let existing_outputs = self
.config
.vector_backend
.list(Some(scope_filter.to_map()), 100)
.await?;
let mut existing_facts = Vec::new();
for output in existing_outputs {
if let Some(fact) = self.output_to_fact(&output).await? {
existing_facts.push(fact);
}
}
let consolidation = self
.config
.fact_consolidator
.consolidate(
&extracted,
&existing_facts,
self.config
.extraction_config
.consolidation_prompt
.as_deref(),
)
.await?;
for fact_text in consolidation.to_add {
let fact_id = self
.create_fact(&fact_text, &scope, metadata.clone(), enable_graph)
.await?;
outcome.add_created(
fact_id.clone(),
FactOperation::new(&fact_id, &fact_text, ChangeType::Created),
);
}
for (fact_id, new_content) in consolidation.to_update {
let embedding = self.config.embedding_provider.embed(&new_content).await?;
let existing_output = self.config.vector_backend.get(&fact_id).await?;
let mut payload = if let Some(ref output) = existing_output {
output.payload.clone()
} else {
HashMap::new()
};
payload.insert("data".to_string(), serde_json::json!(&new_content));
payload.insert(
"updated_at".to_string(),
serde_json::json!(Utc::now().to_rfc3339()),
);
self.config
.vector_backend
.update(&fact_id, Some(embedding), Some(payload))
.await?;
let change = ChangeLog::new(&fact_id, ChangeType::Updated, scope.clone())
.with_new_content(&new_content);
self.config.history_store.record_change(change).await?;
outcome.add_updated(
fact_id.clone(),
FactOperation::new(&fact_id, &new_content, ChangeType::Updated),
);
}
for fact_id in consolidation.to_delete {
self.config.vector_backend.delete(&fact_id).await?;
if let Some(ref graph) = self.config.graph_backend {
graph.delete_fact_graph(&fact_id).await?;
}
let change = ChangeLog::new(&fact_id, ChangeType::Deleted, scope.clone());
self.config.history_store.record_change(change).await?;
outcome.add_consolidated(
fact_id.clone(),
FactOperation::new(&fact_id, "", ChangeType::Deleted),
);
}
} else {
let fact_id = self
.create_fact(content, &scope, metadata, enable_graph)
.await?;
outcome.add_created(
fact_id.clone(),
FactOperation::new(&fact_id, content, ChangeType::Created),
);
}
Ok(outcome)
}
pub(crate) async fn search_internal(
&self,
query: &str,
scope: ScopeIdentifiers,
limit: usize,
filters: Option<HashMap<String, serde_json::Value>>,
rerank: bool,
graph_expansion: bool,
) -> Result<RetrievalResult> {
let request = SearchRequest::new(query, scope.clone(), limit)
.with_filters(filters.unwrap_or_default())
.with_rerank(rerank)
.with_kind(MemoryKind::Semantic);
let result = self.searcher.search(request).await?;
let facts = result.facts;
let related = if graph_expansion && self.config.graph_config.enabled {
if let Some(ref graph) = self.config.graph_backend {
let mut related_facts = Vec::new();
for fact in &facts {
let related = graph
.get_related_facts(
&fact.id,
self.config.graph_config.max_relationship_depth,
Some(&ScopeFilter::from_scope(&scope)),
)
.await?;
related_facts.extend(related);
}
Some(related_facts)
} else {
None
}
} else {
None
};
Ok(RetrievalResult::new(facts).with_related(related.unwrap_or_default()))
}
pub(crate) async fn list_internal(
&self,
scope: ScopeIdentifiers,
limit: usize,
filters: Option<HashMap<String, serde_json::Value>>,
) -> Result<RetrievalResult> {
let mut scope_filter = ScopeFilter::from_scope(&scope);
if let Some(additional) = filters {
for (k, v) in additional {
scope_filter.additional.insert(k, v);
}
}
let outputs = self
.config
.vector_backend
.list(Some(scope_filter.to_map()), limit)
.await?;
let mut facts = Vec::new();
for output in outputs {
if let Some(fact) = self.output_to_fact(&output).await? {
facts.push(fact);
}
}
Ok(RetrievalResult::new(facts))
}
async fn create_fact(
&self,
content: &str,
scope: &ScopeIdentifiers,
metadata: Option<HashMap<String, serde_json::Value>>,
enable_graph: bool,
) -> Result<FactId> {
let fact_id = Uuid::new_v4().to_string();
let now = Utc::now();
let embedding = self.config.embedding_provider.embed(content).await?;
let fact = {
let mut f = StoredFact::new(&fact_id, content, scope.clone());
f.created_at = now;
f.updated_at = now;
if let Some(meta) = metadata {
f.metadata = meta;
}
f
};
let payload = self.fact_to_payload(&fact);
self.config
.vector_backend
.insert(
vec![embedding],
Some(vec![payload]),
Some(vec![fact_id.clone()]),
)
.await?;
if enable_graph && self.config.graph_config.enabled {
if let Some(ref graph) = self.config.graph_backend {
let entities = graph.extract_entities(content).await?;
if !entities.is_empty() {
graph.build_relationships(&entities, &fact_id).await?;
}
}
}
let change =
ChangeLog::new(&fact_id, ChangeType::Created, scope.clone()).with_new_content(content);
self.config.history_store.record_change(change).await?;
Ok(fact_id)
}
async fn output_to_fact(
&self,
output: &crate::vector_store::OutputData,
) -> Result<Option<StoredFact>> {
let content = output.get_data().unwrap_or_default();
let hash = output.get_string("hash").unwrap_or_default();
let created_at_str = output.get_string("created_at").unwrap_or_default();
let updated_at_str = output.get_string("updated_at").unwrap_or_default();
let created_at = DateTime::parse_from_rfc3339(&created_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let scope = crate::core::ScopeIdentifiers {
user: output.get_string("user_id"),
agent: output.get_string("agent_id"),
session: output.get_string("session_id"),
};
let mut metadata = HashMap::new();
let core_keys = [
"data",
"hash",
"created_at",
"updated_at",
"user_id",
"agent_id",
"session_id",
];
for (k, v) in &output.payload {
if !core_keys.contains(&k.as_str()) {
metadata.insert(k.clone(), v.clone());
}
}
Ok(Some(StoredFact {
id: output.id.clone(),
content,
scope,
embedding: None,
created_at,
updated_at,
content_hash: hash,
metadata,
relevance_score: output.score,
}))
}
fn fact_to_payload(&self, fact: &StoredFact) -> HashMap<String, serde_json::Value> {
let mut payload = fact.scope.to_filter_map();
payload.insert("data".to_string(), serde_json::json!(fact.content));
payload.insert("hash".to_string(), serde_json::json!(fact.content_hash));
payload.insert(
"created_at".to_string(),
serde_json::json!(fact.created_at.to_rfc3339()),
);
payload.insert(
"updated_at".to_string(),
serde_json::json!(fact.updated_at.to_rfc3339()),
);
for (k, v) in &fact.metadata {
payload.insert(k.clone(), v.clone());
}
payload
}
}
impl EngineConfig {
pub async fn default() -> Result<Self> {
let history_path = dirs::home_dir()
.map(|p| p.join(".neomemx").join("history.db"))
.unwrap_or_else(|| PathBuf::from("/tmp/neomemx/history.db"));
let llm_config = GroqLlmConfig::default();
let embedder_config = JinaEmbedderConfig::default();
let vector_store_config = ChromaConfig::default();
let llm = Arc::new(GroqLlm::new(llm_config)?);
let embedding = Arc::new(JinaEmbedding::new(embedder_config)?);
let vector_store = Arc::new(ChromaDB::new(vector_store_config).await?);
let db = Arc::new(SqliteManager::new(&history_path)?);
let history_store = db.clone();
let fact_extractor = Arc::new(LlmFactExtractor::new(llm.clone()));
let fact_consolidator = Arc::new(LlmFactConsolidator::new(llm.clone()));
let extraction_config = ExtractionConfig::default();
let graph_config = GraphConfig::default();
Ok(Self {
llm_provider: llm,
embedding_provider: embedding,
vector_backend: vector_store,
graph_backend: None,
history_store,
fact_extractor,
fact_consolidator,
reranker: None,
extraction_config: extraction_config,
graph_config: graph_config,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::ScopeIdentifiers;
use crate::test_helpers::test_engine;
#[tokio::test]
async fn store_and_update_tracks_history_and_payload() {
let engine = test_engine(false).await;
let scope = ScopeIdentifiers::for_user("u1");
let out = engine
.store("hello")
.with_scope(scope.clone())
.enable_extraction(false)
.execute()
.await
.unwrap();
let id = &out.operations[0].fact_id;
let result = engine
.update(id, "hello world", scope.clone())
.await
.unwrap();
assert_eq!(result.previous_content, "hello");
assert_eq!(result.new_content, "hello world");
let history = engine.history(id).await.unwrap();
assert_eq!(history.len(), 2);
assert!(matches!(history[0].change_type, ChangeType::Created));
assert!(matches!(history[1].change_type, ChangeType::Updated));
}
#[tokio::test]
async fn delete_respects_scope_and_clears_vector_store() {
let engine = test_engine(false).await;
let scope = ScopeIdentifiers::for_user("u2");
let out = engine
.store("fact")
.with_scope(scope.clone())
.enable_extraction(false)
.execute()
.await
.unwrap();
let id = &out.operations[0].fact_id;
engine.delete(id, scope.clone()).await.unwrap();
let after = engine.history(id).await.unwrap();
assert!(after.iter().any(|c| c.change_type == ChangeType::Deleted));
}
#[tokio::test]
async fn search_respects_scopes() {
let engine = test_engine(false).await;
let scope_a = ScopeIdentifiers::for_user("a");
let scope_b = ScopeIdentifiers::for_user("b");
engine
.store("I play tennis")
.with_scope(scope_a.clone())
.enable_extraction(false)
.execute()
.await
.unwrap();
engine
.store("I play chess")
.with_scope(scope_b.clone())
.enable_extraction(false)
.execute()
.await
.unwrap();
let res = engine
.search("play")
.with_scope(scope_a.clone())
.limit(10)
.execute()
.await
.unwrap();
assert!(res.facts.iter().all(|f| f.scope == scope_a));
}
}