use std::{path::Path, sync::Arc};
use chrono::{DateTime, Utc};
use serde_json::{Value, json};
use crate::{
application::services::projection::Projection as _,
embedded::{Config, EmbeddedCore, IngestEvent, Query},
error::Result,
infrastructure::cluster::crdt::MergeStrategy,
};
use super::{
error::{PrimeError, PrimeResult},
projections::{
AdjacencyListProjection, Contradiction, ContradictionDetectionProjection,
CrossDomainProjection, GraphStatsProjection, NodeStateProjection, NodeTypeIndexProjection,
ReverseIndexProjection,
},
schema::SchemaProjection,
types::{
Direction, Edge, EdgeId, Node, NodeId, PrimeStats, edge_entity_id, event_types,
node_entity_id,
},
};
const PROJ_NODE_STATE: &str = "prime.node_state";
const PROJ_NODE_TYPE_INDEX: &str = "prime.node_type_index";
const PROJ_ADJACENCY: &str = "prime.adjacency";
const PROJ_REVERSE_INDEX: &str = "prime.reverse_index";
const PROJ_GRAPH_STATS: &str = "prime.graph_stats";
const PROJ_SCHEMA: &str = "prime.schema";
const PROJ_CONTRADICTION: &str = "prime.contradiction";
const PROJ_CROSS_DOMAIN: &str = "prime.cross_domain";
#[cfg(feature = "prime-vectors")]
const PROJ_VECTOR_INDEX: &str = "prime.vector_index";
pub struct Prime {
core: EmbeddedCore,
node_state: Arc<NodeStateProjection>,
node_type_index: Arc<NodeTypeIndexProjection>,
adjacency: Arc<AdjacencyListProjection>,
reverse_index: Arc<ReverseIndexProjection>,
graph_stats: Arc<GraphStatsProjection>,
schema: Arc<SchemaProjection>,
contradiction: Arc<ContradictionDetectionProjection>,
cross_domain: Arc<CrossDomainProjection>,
#[cfg(feature = "prime-vectors")]
vector_index: Arc<super::vectors::VectorIndexProjection>,
}
impl Prime {
fn config_builder() -> crate::embedded::ConfigBuilder {
Config::builder()
.merge_strategy(event_types::NODE_CREATED, MergeStrategy::FirstWriteWins)
.merge_strategy(event_types::NODE_UPDATED, MergeStrategy::LastWriteWins)
.merge_strategy(event_types::EDGE_CREATED, MergeStrategy::AppendOnly)
.merge_strategy(event_types::EDGE_DELETED, MergeStrategy::LastWriteWins)
.merge_strategy(event_types::NODE_DELETED, MergeStrategy::LastWriteWins)
}
#[allow(clippy::type_complexity)]
fn register_graph_projections(
store: &Arc<crate::store::EventStore>,
) -> (
Arc<NodeStateProjection>,
Arc<NodeTypeIndexProjection>,
Arc<AdjacencyListProjection>,
Arc<ReverseIndexProjection>,
Arc<GraphStatsProjection>,
Arc<SchemaProjection>,
Arc<ContradictionDetectionProjection>,
Arc<CrossDomainProjection>,
) {
let node_state = Arc::new(NodeStateProjection::new(PROJ_NODE_STATE));
let node_type_index = Arc::new(NodeTypeIndexProjection::new(PROJ_NODE_TYPE_INDEX));
let adjacency = Arc::new(AdjacencyListProjection::new_forward(PROJ_ADJACENCY));
let reverse_index = Arc::new(ReverseIndexProjection::new_reverse(PROJ_REVERSE_INDEX));
let graph_stats = Arc::new(GraphStatsProjection::new(PROJ_GRAPH_STATS));
let schema = Arc::new(SchemaProjection::new(PROJ_SCHEMA));
let contradiction = Arc::new(ContradictionDetectionProjection::new(PROJ_CONTRADICTION));
let cross_domain = Arc::new(CrossDomainProjection::new());
type DynProj = Arc<dyn crate::application::services::projection::Projection>;
let _ = store.register_projection_with_backfill(&(Arc::clone(&node_state) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&node_type_index) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&adjacency) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&reverse_index) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&graph_stats) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&schema) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&contradiction) as DynProj));
let _ = store.register_projection_with_backfill(&(Arc::clone(&cross_domain) as DynProj));
(
node_state,
node_type_index,
adjacency,
reverse_index,
graph_stats,
schema,
contradiction,
cross_domain,
)
}
#[cfg(feature = "prime-vectors")]
fn register_vector_projection(
store: &Arc<crate::store::EventStore>,
) -> Arc<super::vectors::VectorIndexProjection> {
type DynProj = Arc<dyn crate::application::services::projection::Projection>;
let vi = Arc::new(super::vectors::VectorIndexProjection::new(
PROJ_VECTOR_INDEX,
));
let _ = store.register_projection_with_backfill(&(Arc::clone(&vi) as DynProj));
vi
}
fn from_core(core: EmbeddedCore) -> Self {
let store = core.inner();
let (
node_state,
node_type_index,
adjacency,
reverse_index,
graph_stats,
schema,
contradiction,
cross_domain,
) = Self::register_graph_projections(&store);
#[cfg(feature = "prime-vectors")]
let vector_index = Self::register_vector_projection(&store);
Self {
core,
node_state,
node_type_index,
adjacency,
reverse_index,
graph_stats,
schema,
contradiction,
cross_domain,
#[cfg(feature = "prime-vectors")]
vector_index,
}
}
pub async fn open(path: impl AsRef<Path>) -> Result<Self> {
let config = Self::config_builder().data_dir(path.as_ref()).build()?;
let core = EmbeddedCore::open(config).await?;
Ok(Self::from_core(core))
}
pub async fn open_in_memory() -> Result<Self> {
let config = Self::config_builder().build()?;
let core = EmbeddedCore::open(config).await?;
Ok(Self::from_core(core))
}
pub async fn shutdown(self) -> Result<()> {
self.core.shutdown().await
}
pub fn stats(&self) -> PrimeStats {
self.graph_stats.stats()
}
pub fn core(&self) -> &EmbeddedCore {
&self.core
}
#[cfg(feature = "prime-recall")]
pub fn recall_deps(&self) -> super::recall::RecallDeps {
super::recall::RecallDeps {
domain_index: Arc::new(super::projections::DomainIndexProjection::new()),
cross_domain: Arc::clone(&self.cross_domain),
node_state: Some(Arc::clone(&self.node_state)),
adjacency: Some(Arc::clone(&self.adjacency)),
graph_stats: Some(Arc::clone(&self.graph_stats)),
}
}
pub async fn register_schema(
&self,
type_name: &str,
kind: super::schema::SchemaKind,
schema: Value,
) -> PrimeResult<()> {
let entity_id = format!("schema:{type_name}");
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: super::schema::SCHEMA_REGISTERED,
payload: json!({
"type_name": type_name,
"kind": match kind {
super::schema::SchemaKind::Node => "node",
super::schema::SchemaKind::Edge => "edge",
},
"schema": schema,
}),
metadata: None,
tenant_id: None,
})
.await?;
Ok(())
}
pub fn schemas(&self) -> Vec<super::schema::SchemaEntry> {
self.schema.schemas()
}
pub fn configure_exclusive(&self, relation: &str) {
let all_entity_ids = self.node_type_index.all_entity_ids();
self.contradiction.configure_exclusive(relation);
self.contradiction
.backfill_exclusive(relation, &self.adjacency, &all_entity_ids);
}
pub fn contradictions(&self) -> Vec<Contradiction> {
self.contradiction.contradictions()
}
pub async fn resolve_contradiction(
&self,
contradiction_id: &str,
keep_edge: &str,
) -> PrimeResult<()> {
if let Some(delete_edge) = self.contradiction.resolve(contradiction_id, keep_edge) {
self.delete_edge(&delete_edge).await?;
}
Ok(())
}
pub async fn add_node(
&self,
node_type: &str,
properties: serde_json::Value,
) -> PrimeResult<NodeId> {
self.schema.validate_node(node_type, &properties)?;
let id = uuid::Uuid::new_v4().to_string();
let entity_id = node_entity_id(node_type, &id);
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: event_types::NODE_CREATED,
payload: json!({
"id": id,
"node_type": node_type,
"properties": properties,
}),
metadata: None,
tenant_id: None,
})
.await?;
Ok(NodeId::new(id))
}
pub fn get_node(&self, entity_id: &str) -> Option<Node> {
let node = self.node_state.get_node(entity_id)?;
if node.deleted {
return None;
}
Some(node)
}
pub async fn update_node(
&self,
entity_id: &str,
properties: serde_json::Value,
) -> PrimeResult<()> {
let node = self
.node_state
.get_node(entity_id)
.filter(|n| !n.deleted)
.ok_or_else(|| PrimeError::NodeNotFound(entity_id.to_string()))?;
let mut merged = node.properties.clone();
if let (Value::Object(base), Value::Object(update)) = (&mut merged, &properties) {
for (k, v) in update {
base.insert(k.clone(), v.clone());
}
}
self.schema.validate_node(&node.node_type, &merged)?;
self.core
.ingest(IngestEvent {
entity_id,
event_type: event_types::NODE_UPDATED,
payload: json!({ "properties": properties }),
metadata: None,
tenant_id: None,
})
.await?;
Ok(())
}
pub async fn delete_node(&self, entity_id: &str) -> PrimeResult<()> {
if !self.node_state.is_live(entity_id) {
return Err(PrimeError::NodeNotFound(entity_id.to_string()));
}
let outgoing = self.adjacency.outgoing(entity_id);
let incoming = self.reverse_index.incoming(entity_id);
struct EdgeDel {
entity_id: String,
payload: serde_json::Value,
}
let mut edge_dels: Vec<EdgeDel> = Vec::with_capacity(outgoing.len() + incoming.len());
let mut seen_edge_ids = std::collections::HashSet::new();
for adj in &outgoing {
if seen_edge_ids.insert(adj.edge_id.clone()) {
edge_dels.push(EdgeDel {
entity_id: format!("edge:{}", adj.edge_id),
payload: json!({"id": adj.edge_id, "source": entity_id, "target": adj.peer}),
});
}
}
for adj in &incoming {
if seen_edge_ids.insert(adj.edge_id.clone()) {
edge_dels.push(EdgeDel {
entity_id: format!("edge:{}", adj.edge_id),
payload: json!({"id": adj.edge_id, "source": adj.peer, "target": entity_id}),
});
}
}
let mut batch: Vec<IngestEvent<'_>> = edge_dels
.iter()
.map(|ed| IngestEvent {
entity_id: &ed.entity_id,
event_type: event_types::EDGE_DELETED,
payload: ed.payload.clone(),
metadata: None,
tenant_id: None,
})
.collect();
batch.push(IngestEvent {
entity_id,
event_type: event_types::NODE_DELETED,
payload: json!({}),
metadata: None,
tenant_id: None,
});
self.core.ingest_batch(batch).await?;
Ok(())
}
pub fn nodes_by_type(&self, node_type: &str) -> Vec<Node> {
self.node_type_index
.nodes_by_type(node_type)
.iter()
.filter_map(|entity_id| self.get_node(entity_id))
.collect()
}
pub async fn add_edge(
&self,
source: &str,
target: &str,
relation: &str,
properties: Option<serde_json::Value>,
) -> PrimeResult<EdgeId> {
self.add_edge_inner(source, target, relation, None, properties)
.await
}
pub async fn add_edge_weighted(
&self,
source: &str,
target: &str,
relation: &str,
weight: f64,
properties: Option<serde_json::Value>,
) -> PrimeResult<EdgeId> {
self.add_edge_inner(source, target, relation, Some(weight), properties)
.await
}
async fn add_edge_inner(
&self,
source: &str,
target: &str,
relation: &str,
weight: Option<f64>,
properties: Option<serde_json::Value>,
) -> PrimeResult<EdgeId> {
if !self.node_state.is_live(source) {
return Err(PrimeError::NodeNotFound(source.to_string()));
}
if !self.node_state.is_live(target) {
return Err(PrimeError::NodeNotFound(target.to_string()));
}
self.schema.validate_edge(relation, properties.as_ref())?;
let id = uuid::Uuid::new_v4().to_string();
let entity_id = edge_entity_id(&id);
let mut payload = json!({
"id": id,
"source": source,
"target": target,
"relation": relation,
});
if let Some(w) = weight {
payload["weight"] = json!(w);
}
if let Some(props) = properties {
payload["properties"] = props;
}
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: event_types::EDGE_CREATED,
payload,
metadata: None,
tenant_id: None,
})
.await?;
Ok(EdgeId::new(id))
}
pub async fn get_edge(&self, edge_id: &str) -> PrimeResult<Option<Edge>> {
let entity_id = edge_entity_id(edge_id);
let events = self.core.query(Query::new().entity_id(&entity_id)).await?;
let is_deleted = events
.iter()
.any(|e| e.event_type == event_types::EDGE_DELETED);
if is_deleted {
return Ok(None);
}
let created = events
.iter()
.find(|e| e.event_type == event_types::EDGE_CREATED);
Ok(created.and_then(edge_from_event))
}
pub async fn delete_edge(&self, edge_id: &str) -> PrimeResult<()> {
let entity_id = edge_entity_id(edge_id);
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: event_types::EDGE_DELETED,
payload: json!({"id": edge_id}),
metadata: None,
tenant_id: None,
})
.await?;
Ok(())
}
pub async fn compact(
&self,
target_entity_id: &str,
source_entity_ids: &[&str],
) -> PrimeResult<()> {
let target_node = self
.get_node(target_entity_id)
.ok_or_else(|| PrimeError::NodeNotFound(target_entity_id.to_string()))?;
let mut merged_properties = target_node.properties.clone();
let mut merged_source_ids = Vec::new();
for &source_id in source_entity_ids {
let source_node = self
.get_node(source_id)
.ok_or_else(|| PrimeError::NodeNotFound(source_id.to_string()))?;
if let (Value::Object(target_map), Value::Object(source_map)) =
(&mut merged_properties, &source_node.properties)
{
for (key, value) in source_map {
target_map
.entry(key.clone())
.or_insert_with(|| value.clone());
}
}
merged_source_ids.push(source_id.to_string());
for entry in self.adjacency.outgoing(source_id) {
if entry.peer == target_entity_id {
continue;
}
let already_exists = self
.adjacency
.outgoing(target_entity_id)
.iter()
.any(|e| e.peer == entry.peer && e.relation == entry.relation);
if !already_exists {
self.core
.ingest(IngestEvent {
entity_id: &edge_entity_id(&uuid::Uuid::new_v4().to_string()),
event_type: event_types::EDGE_CREATED,
payload: json!({
"id": uuid::Uuid::new_v4().to_string(),
"source": target_entity_id,
"target": entry.peer,
"relation": entry.relation,
}),
metadata: None,
tenant_id: None,
})
.await?;
}
self.core
.ingest(IngestEvent {
entity_id: &edge_entity_id(&entry.edge_id),
event_type: event_types::EDGE_DELETED,
payload: json!({"id": entry.edge_id}),
metadata: None,
tenant_id: None,
})
.await?;
}
for entry in self.reverse_index.incoming(source_id) {
if entry.peer == target_entity_id {
continue;
}
let already_exists = self
.reverse_index
.incoming(target_entity_id)
.iter()
.any(|e| e.peer == entry.peer && e.relation == entry.relation);
if !already_exists {
self.core
.ingest(IngestEvent {
entity_id: &edge_entity_id(&uuid::Uuid::new_v4().to_string()),
event_type: event_types::EDGE_CREATED,
payload: json!({
"id": uuid::Uuid::new_v4().to_string(),
"source": entry.peer,
"target": target_entity_id,
"relation": entry.relation,
}),
metadata: None,
tenant_id: None,
})
.await?;
}
self.core
.ingest(IngestEvent {
entity_id: &edge_entity_id(&entry.edge_id),
event_type: event_types::EDGE_DELETED,
payload: json!({"id": entry.edge_id}),
metadata: None,
tenant_id: None,
})
.await?;
}
self.core
.ingest(IngestEvent {
entity_id: source_id,
event_type: event_types::NODE_DELETED,
payload: json!({}),
metadata: None,
tenant_id: None,
})
.await?;
}
self.core
.ingest(IngestEvent {
entity_id: target_entity_id,
event_type: event_types::NODE_UPDATED,
payload: json!({
"properties": merged_properties,
}),
metadata: None,
tenant_id: None,
})
.await?;
self.core
.ingest(IngestEvent {
entity_id: target_entity_id,
event_type: "prime.memory.compacted",
payload: json!({
"target": target_entity_id,
"merged_from": merged_source_ids,
}),
metadata: None,
tenant_id: None,
})
.await?;
#[cfg(feature = "prime-vectors")]
{
for source_id in &merged_source_ids {
let vec_entity_id = super::vectors::vector_entity_id(source_id);
if self.vector_index.get_state(&vec_entity_id).is_some() {
self.core
.ingest(IngestEvent {
entity_id: &vec_entity_id,
event_type: super::vectors::event_types::VECTOR_DELETED,
payload: json!({}),
metadata: None,
tenant_id: None,
})
.await?;
}
}
}
Ok(())
}
#[cfg(feature = "prime-vectors")]
pub async fn embed(&self, id: &str, text: Option<&str>, vector: Vec<f32>) -> PrimeResult<()> {
self.embed_with_metadata(id, text, vector, None).await
}
#[cfg(feature = "prime-vectors")]
pub async fn embed_with_metadata(
&self,
id: &str,
text: Option<&str>,
vector: Vec<f32>,
metadata: Option<serde_json::Value>,
) -> PrimeResult<()> {
let entity_id = super::vectors::vector_entity_id(id);
let dimensions = vector.len();
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: super::vectors::event_types::VECTOR_STORED,
payload: json!({
"text": text,
"dimensions": dimensions,
"metadata": metadata,
}),
metadata: Some(json!({ "embedding": vector })),
tenant_id: None,
})
.await?;
Ok(())
}
#[cfg(feature = "prime-vectors")]
pub fn similar(
&self,
id: &str,
top_k: usize,
) -> PrimeResult<Vec<super::vectors::VectorSearchResult>> {
let entity_id = super::vectors::vector_entity_id(id);
let state = self
.vector_index
.get_state(&entity_id)
.ok_or_else(|| PrimeError::NodeNotFound(id.to_string()))?;
let vector: Vec<f32> = state
.get("vector")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
Ok(self.vector_search(&vector, top_k))
}
#[cfg(feature = "prime-vectors")]
pub fn vector_search(
&self,
query: &[f32],
top_k: usize,
) -> Vec<super::vectors::VectorSearchResult> {
self.vector_index
.search(query, top_k)
.into_iter()
.map(|hit| super::vectors::VectorSearchResult {
id: hit.entity_id,
score: 1.0 - f64::from(hit.distance), text: hit.text,
metadata: hit.metadata,
})
.collect()
}
#[cfg(feature = "prime-vectors")]
pub fn vector_search_cross_domain(
&self,
query: &[f32],
top_k: usize,
) -> Vec<super::vectors::VectorSearchResult> {
use std::collections::HashMap;
let all_results = self.vector_search(query, top_k * 3);
if all_results.is_empty() {
return all_results;
}
let mut by_domain: HashMap<String, Vec<super::vectors::VectorSearchResult>> =
HashMap::new();
for result in all_results {
let domain = self
.domain_of(&result.id)
.unwrap_or_else(|| "unknown".to_string());
by_domain.entry(domain).or_default().push(result);
}
if by_domain.len() <= 1 {
let mut flat: Vec<_> = by_domain.into_values().flatten().collect();
flat.truncate(top_k);
return flat;
}
let mut merged = Vec::with_capacity(top_k);
let domain_count = by_domain.len();
let per_domain = (top_k / domain_count).max(1);
for results in by_domain.values() {
merged.extend(results.iter().take(per_domain).cloned());
}
merged.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
merged.truncate(top_k);
merged
}
#[cfg(feature = "prime-vectors")]
fn domain_of(&self, entity_id: &str) -> Option<String> {
let node_id = entity_id.strip_prefix("vec:").unwrap_or(entity_id);
let state = self.node_state.get_state(node_id)?;
if let Some(domain) = state.get("domain").and_then(|v| v.as_str()) {
return Some(domain.to_string());
}
if let Some(domain) = state
.get("properties")
.and_then(|p| p.get("domain"))
.and_then(|v| v.as_str())
{
return Some(domain.to_string());
}
state
.get("node_type")
.and_then(|v| v.as_str())
.map(String::from)
}
#[cfg(feature = "prime-vectors")]
pub async fn delete_vector(&self, id: &str) -> PrimeResult<()> {
let entity_id = super::vectors::vector_entity_id(id);
self.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: super::vectors::event_types::VECTOR_DELETED,
payload: json!({}),
metadata: None,
tenant_id: None,
})
.await?;
Ok(())
}
#[cfg(feature = "prime-vectors")]
pub fn get_vector(&self, id: &str) -> Option<super::vectors::VectorEntry> {
let entity_id = super::vectors::vector_entity_id(id);
let state = self.vector_index.get_state(&entity_id)?;
Some(super::vectors::VectorEntry {
id: id.to_string(),
text: state.get("text").and_then(|v| v.as_str()).map(String::from),
dimensions: state
.get("vector")
.and_then(serde_json::Value::as_array)
.map_or(0, Vec::len),
metadata: state.get("metadata").cloned(),
})
}
#[cfg(feature = "prime-vectors")]
pub async fn remember(
&self,
text: &str,
vector: Vec<f32>,
node_type: &str,
properties: serde_json::Value,
relations: &[(&str, &str)], ) -> PrimeResult<String> {
let node_id = self.add_node(node_type, properties).await?;
let entity_id = node_entity_id(node_type, node_id.as_str());
self.embed(&entity_id, Some(text), vector).await?;
for (target, relation) in relations {
self.add_edge(&entity_id, target, relation, None).await?;
}
Ok(entity_id)
}
#[cfg(feature = "prime-vectors")]
pub async fn forget(&self, entity_id: &str) -> PrimeResult<()> {
self.delete_vector(entity_id).await?;
self.delete_node(entity_id).await?;
Ok(())
}
#[cfg(feature = "prime-vectors")]
pub async fn recall(
&self,
query: super::types::RecallQuery,
) -> PrimeResult<super::types::RecallResult> {
use super::types::{RecallResult, ScoreComponents, ScoredNode};
use std::collections::HashMap;
let now = chrono::Utc::now();
let mut scored: HashMap<String, ScoredNode> = HashMap::new();
let mut vector_results = Vec::new();
let total_weight = query.similarity_weight + query.proximity_weight + query.recency_weight;
let (sw, pw, rw) = if total_weight > 0.0 {
(
query.similarity_weight / total_weight,
query.proximity_weight / total_weight,
query.recency_weight / total_weight,
)
} else {
(1.0 / 3.0, 1.0 / 3.0, 1.0 / 3.0)
};
if let Some(ref qvec) = query.vector {
let hits = self.vector_search(qvec, query.top_k * 2); vector_results = hits
.iter()
.map(|h| super::vectors::VectorSearchResult {
id: h.id.clone(),
score: h.score,
text: h.text.clone(),
metadata: h.metadata.clone(),
})
.collect();
for hit in &hits {
let graph_id = hit.id.strip_prefix("vec:").unwrap_or(&hit.id);
if let Some(node) = self.get_node(graph_id) {
let recency = recency_score(node.updated_at, now);
let components = ScoreComponents {
similarity: hit.score,
proximity: 1.0, recency,
};
let score = sw * hit.score + pw * 1.0 + rw * recency;
scored.insert(
graph_id.to_string(),
ScoredNode {
node,
score,
depth: 0,
components,
},
);
}
}
}
if query.depth > 0 {
let seeds: Vec<String> = scored.keys().cloned().collect();
for seed_id in &seeds {
let bfs_results =
self.neighbors_within(seed_id, query.depth, None, Direction::Both);
for (node, depth) in bfs_results {
if depth == 0 {
continue; }
let entity_id = node_entity_id(&node.node_type, node.id.as_str());
if scored.contains_key(&entity_id) {
continue;
}
if query
.node_type
.as_ref()
.is_some_and(|nt| node.node_type != *nt)
{
continue;
}
let proximity = 1.0 / (1.0 + depth as f64);
let recency = recency_score(node.updated_at, now);
let components = ScoreComponents {
similarity: 0.0, proximity,
recency,
};
let score = pw * proximity + rw * recency;
scored.insert(
entity_id,
ScoredNode {
node,
score,
depth,
components,
},
);
}
}
}
if query.vector.is_none()
&& let Some(ref nt) = query.node_type
{
let nodes = self.nodes_by_type(nt);
for node in nodes {
let entity_id = node_entity_id(&node.node_type, node.id.as_str());
let recency = recency_score(node.updated_at, now);
let components = ScoreComponents {
similarity: 0.0,
proximity: 0.0,
recency,
};
let score = rw * recency;
scored.insert(
entity_id,
ScoredNode {
node,
score,
depth: 0,
components,
},
);
}
}
let mut nodes: Vec<ScoredNode> = scored.into_values().collect();
nodes.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
mmr_rerank(&mut nodes, query.top_k, 0.7);
Ok(RecallResult {
nodes,
vectors: vector_results,
edges: Vec::new(),
})
}
pub fn neighbors(
&self,
entity_id: &str,
relation: Option<&str>,
direction: Direction,
) -> Vec<Node> {
let mut peer_ids: Vec<String> = Vec::new();
match direction {
Direction::Outgoing => {
for entry in self.adjacency.outgoing(entity_id) {
if relation.is_none() || relation == Some(entry.relation.as_str()) {
peer_ids.push(entry.peer.clone());
}
}
}
Direction::Incoming => {
for entry in self.reverse_index.incoming(entity_id) {
if relation.is_none() || relation == Some(entry.relation.as_str()) {
peer_ids.push(entry.peer.clone());
}
}
}
Direction::Both => {
let mut seen = std::collections::HashSet::new();
for entry in self.adjacency.outgoing(entity_id) {
if (relation.is_none() || relation == Some(entry.relation.as_str()))
&& seen.insert(entry.peer.clone())
{
peer_ids.push(entry.peer.clone());
}
}
for entry in self.reverse_index.incoming(entity_id) {
if (relation.is_none() || relation == Some(entry.relation.as_str()))
&& seen.insert(entry.peer.clone())
{
peer_ids.push(entry.peer.clone());
}
}
}
}
peer_ids.iter().filter_map(|id| self.get_node(id)).collect()
}
pub fn neighbors_within(
&self,
entity_id: &str,
depth: usize,
relation: Option<&str>,
direction: Direction,
) -> Vec<(Node, usize)> {
use std::collections::{HashSet, VecDeque};
let mut visited = HashSet::new();
let mut result = Vec::new();
let mut queue = VecDeque::new();
visited.insert(entity_id.to_string());
queue.push_back((entity_id.to_string(), 0usize));
while let Some((current, d)) = queue.pop_front() {
if let Some(node) = self.get_node(¤t) {
result.push((node, d));
}
if d < depth {
let peers = self.neighbor_ids(¤t, relation, direction);
for peer in peers {
if visited.insert(peer.clone()) {
queue.push_back((peer, d + 1));
}
}
}
}
result
}
pub fn subgraph(&self, center: &str, depth: usize) -> super::types::SubGraph {
use std::collections::HashSet;
let bfs = self.neighbors_within(center, depth, None, Direction::Both);
let node_ids: HashSet<String> = bfs
.iter()
.map(|(n, _)| {
node_entity_id(&n.node_type, n.id.as_str())
})
.collect();
let nodes: Vec<Node> = bfs.into_iter().map(|(n, _)| n).collect();
let mut edges = Vec::new();
let mut seen_edges = HashSet::new();
for entity_id in &node_ids {
for entry in self.adjacency.outgoing(entity_id) {
if node_ids.contains(&entry.peer) && seen_edges.insert(entry.edge_id.clone()) {
edges.push(Edge {
id: EdgeId::new(&entry.edge_id),
source: NodeId::new(entity_id.clone()),
target: NodeId::new(&entry.peer),
relation: entry.relation.clone(),
properties: None,
weight: entry.weight,
deleted: false,
created_at: chrono::Utc::now(),
});
}
}
}
super::types::SubGraph { nodes, edges }
}
pub fn shortest_path(&self, from: &str, to: &str, relation: Option<&str>) -> Option<Vec<Node>> {
use std::collections::{HashMap, VecDeque};
if from == to {
return self.get_node(from).map(|n| vec![n]);
}
let mut visited: HashMap<String, String> = HashMap::new(); let mut queue = VecDeque::new();
visited.insert(from.to_string(), String::new());
queue.push_back(from.to_string());
while let Some(current) = queue.pop_front() {
let peers = self.neighbor_ids(¤t, relation, Direction::Outgoing);
for peer in peers {
if visited.contains_key(&peer) {
continue;
}
visited.insert(peer.clone(), current.clone());
if peer == to {
let mut path_ids = vec![to.to_string()];
let mut cursor = to.to_string();
while let Some(parent) = visited.get(&cursor) {
if parent.is_empty() {
break;
}
path_ids.push(parent.clone());
cursor = parent.clone();
}
path_ids.reverse();
return Some(path_ids.iter().filter_map(|id| self.get_node(id)).collect());
}
queue.push_back(peer);
}
}
None }
pub fn shortest_path_weighted(
&self,
from: &str,
to: &str,
relation: Option<&str>,
) -> Option<(Vec<Node>, f64)> {
use std::{
cmp::Ordering,
collections::{BinaryHeap, HashMap},
};
if from == to {
return self.get_node(from).map(|n| (vec![n], 0.0));
}
#[derive(PartialEq)]
struct State {
cost: f64,
node: String,
}
impl Eq for State {}
impl PartialOrd for State {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for State {
fn cmp(&self, other: &Self) -> Ordering {
other
.cost
.partial_cmp(&self.cost)
.unwrap_or(Ordering::Equal)
}
}
let mut dist: HashMap<String, f64> = HashMap::new();
let mut prev: HashMap<String, String> = HashMap::new();
let mut heap = BinaryHeap::new();
dist.insert(from.to_string(), 0.0);
heap.push(State {
cost: 0.0,
node: from.to_string(),
});
while let Some(State { cost, node }) = heap.pop() {
if node == to {
let mut path_ids = vec![to.to_string()];
let mut cursor = to.to_string();
while let Some(parent) = prev.get(&cursor) {
path_ids.push(parent.clone());
cursor = parent.clone();
}
path_ids.reverse();
let nodes: Vec<Node> = path_ids.iter().filter_map(|id| self.get_node(id)).collect();
return Some((nodes, cost));
}
if cost > *dist.get(&node).unwrap_or(&f64::INFINITY) {
continue;
}
for entry in self.adjacency.outgoing(&node) {
if relation.is_some() && relation != Some(entry.relation.as_str()) {
continue;
}
let edge_weight = entry.weight.unwrap_or(1.0);
let next_cost = cost + edge_weight;
if next_cost < *dist.get(&entry.peer).unwrap_or(&f64::INFINITY) {
dist.insert(entry.peer.clone(), next_cost);
prev.insert(entry.peer.clone(), node.clone());
heap.push(State {
cost: next_cost,
node: entry.peer.clone(),
});
}
}
}
None
}
pub async fn history(&self, entity_id: &str) -> PrimeResult<Vec<super::types::HistoryEntry>> {
let events = self.core.query(Query::new().entity_id(entity_id)).await?;
let entries = events
.iter()
.map(|e| super::types::HistoryEntry {
event_type: e.event_type.clone(),
timestamp: e.timestamp,
payload: e.payload.clone(),
source: e
.metadata
.as_ref()
.and_then(|m| m.get("source"))
.and_then(|v| v.as_str())
.map(String::from),
})
.collect();
Ok(entries)
}
pub async fn diff(
&self,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> PrimeResult<super::types::GraphDiff> {
use super::types::GraphDiff;
let events = self
.core
.query(
Query::new()
.event_type_prefix("prime.")
.since(from)
.until(to),
)
.await?;
let mut diff = GraphDiff::default();
for event in &events {
let entity_id = event.entity_id.clone();
match event.event_type.as_str() {
event_types::NODE_CREATED => diff.nodes_added.push(entity_id),
event_types::NODE_UPDATED => diff.nodes_updated.push(entity_id),
event_types::NODE_DELETED => diff.nodes_deleted.push(entity_id),
event_types::EDGE_CREATED => diff.edges_added.push(entity_id),
event_types::EDGE_DELETED => diff.edges_deleted.push(entity_id),
#[cfg(feature = "prime-vectors")]
super::vectors::types::event_types::VECTOR_STORED => {
diff.vectors_stored.push(entity_id);
}
#[cfg(feature = "prime-vectors")]
super::vectors::types::event_types::VECTOR_DELETED => {
diff.vectors_deleted.push(entity_id);
}
_ => {}
}
}
Ok(diff)
}
pub async fn timeline(
&self,
entity_id: &str,
from: Option<DateTime<Utc>>,
to: Option<DateTime<Utc>>,
) -> PrimeResult<Vec<super::types::HistoryEntry>> {
let mut query = Query::new().entity_id(entity_id);
if let Some(f) = from {
query = query.since(f);
}
if let Some(t) = to {
query = query.until(t);
}
let events = self.core.query(query).await?;
let entries = events
.iter()
.map(|e| super::types::HistoryEntry {
event_type: e.event_type.clone(),
timestamp: e.timestamp,
payload: e.payload.clone(),
source: e
.metadata
.as_ref()
.and_then(|m| m.get("source"))
.and_then(|v| v.as_str())
.map(String::from),
})
.collect();
Ok(entries)
}
pub fn with_conversation<'a>(&'a self, conversation_id: &'a str) -> ConversationScope<'a> {
ConversationScope {
prime: self,
conversation_id,
}
}
pub async fn conversation_history(
&self,
conversation_id: &str,
) -> PrimeResult<Vec<super::types::HistoryEntry>> {
let all_events = self
.core
.query(Query::new().event_type_prefix("prime."))
.await?;
let entries = all_events
.iter()
.filter(|e| {
e.metadata
.as_ref()
.and_then(|m| m.get("conversation_id"))
.and_then(|v| v.as_str())
== Some(conversation_id)
})
.map(|e| super::types::HistoryEntry {
event_type: e.event_type.clone(),
timestamp: e.timestamp,
payload: e.payload.clone(),
source: e
.metadata
.as_ref()
.and_then(|m| m.get("source"))
.and_then(|v| v.as_str())
.map(String::from),
})
.collect();
Ok(entries)
}
pub async fn conversation_diff(
&self,
conversation_id: &str,
) -> PrimeResult<super::types::GraphDiff> {
let history = self.conversation_history(conversation_id).await?;
let mut diff = super::types::GraphDiff::default();
for entry in &history {
match entry.event_type.as_str() {
event_types::NODE_CREATED => diff.nodes_added.push(
entry
.payload
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
),
event_types::NODE_UPDATED => diff.nodes_updated.push(
entry
.payload
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
),
event_types::NODE_DELETED => diff.nodes_deleted.push(String::new()),
event_types::EDGE_CREATED => diff.edges_added.push(
entry
.payload
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
),
event_types::EDGE_DELETED => diff.edges_deleted.push(
entry
.payload
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
),
_ => {}
}
}
Ok(diff)
}
pub async fn get_node_as_of(
&self,
entity_id: &str,
as_of: DateTime<Utc>,
) -> PrimeResult<Option<Node>> {
let events = self
.core
.query(Query::new().entity_id(entity_id).until(as_of))
.await?;
if events.is_empty() {
return Ok(None);
}
let mut node_type = String::new();
let mut properties = serde_json::Map::new();
let mut domain: Option<String> = None;
let mut labels: Vec<String> = Vec::new();
let mut deleted = false;
let mut created_at = as_of;
for event in &events {
match event.event_type.as_str() {
event_types::NODE_CREATED => {
node_type = event
.payload
.get("node_type")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
if let Some(serde_json::Value::Object(props)) = event.payload.get("properties")
{
properties = props.clone();
}
domain = event
.payload
.get("domain")
.and_then(|v| v.as_str())
.map(String::from);
labels = event
.payload
.get("labels")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
deleted = false;
created_at = event.timestamp;
}
event_types::NODE_UPDATED => {
if let Some(serde_json::Value::Object(updates)) =
event.payload.get("properties")
{
for (key, value) in updates {
properties.insert(key.clone(), value.clone());
}
}
if let Some(d) = event.payload.get("domain") {
domain = d.as_str().map(String::from);
}
if let Some(l) = event
.payload
.get("labels")
.and_then(serde_json::Value::as_array)
{
labels = l
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
}
}
event_types::NODE_DELETED => {
deleted = true;
}
_ => {}
}
}
if deleted || node_type.is_empty() {
return Ok(None);
}
let id = entity_id.split(':').nth(2).unwrap_or(entity_id).to_string();
Ok(Some(Node {
id: NodeId::new(id),
node_type,
properties: serde_json::Value::Object(properties),
domain,
labels,
deleted: false,
created_at,
updated_at: events.last().map_or(created_at, |e| e.timestamp),
}))
}
pub async fn neighbors_as_of(
&self,
entity_id: &str,
relation: Option<&str>,
as_of: DateTime<Utc>,
) -> PrimeResult<Vec<Node>> {
let all_events = self
.core
.query(Query::new().event_type_prefix("prime.edge.").until(as_of))
.await?;
let mut live_targets: std::collections::HashMap<String, String> =
std::collections::HashMap::new(); let mut deleted_edges: std::collections::HashSet<String> = std::collections::HashSet::new();
for event in &all_events {
match event.event_type.as_str() {
event_types::EDGE_CREATED => {
let source = event
.payload
.get("source")
.and_then(|v| v.as_str())
.unwrap_or("");
if source != entity_id {
continue;
}
let rel = event
.payload
.get("relation")
.and_then(|v| v.as_str())
.unwrap_or("");
if relation.is_some() && relation != Some(rel) {
continue;
}
if let Some(edge_id) = event.payload.get("id").and_then(|v| v.as_str()) {
let target = event
.payload
.get("target")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
live_targets.insert(edge_id.to_string(), target);
}
}
event_types::EDGE_DELETED => {
if let Some(edge_id) = event.payload.get("id").and_then(|v| v.as_str()) {
deleted_edges.insert(edge_id.to_string());
}
}
_ => {}
}
}
for deleted in &deleted_edges {
live_targets.remove(deleted);
}
let mut neighbors = Vec::new();
let mut seen = std::collections::HashSet::new();
for target in live_targets.values() {
if seen.insert(target.clone())
&& let Some(node) = Box::pin(self.get_node_as_of(target, as_of)).await?
{
neighbors.push(node);
}
}
Ok(neighbors)
}
fn neighbor_ids(
&self,
entity_id: &str,
relation: Option<&str>,
direction: Direction,
) -> Vec<String> {
let mut peers = Vec::new();
let mut seen = std::collections::HashSet::new();
let add_entries = |entries: Vec<super::projections::AdjEntry>,
peers: &mut Vec<String>,
seen: &mut std::collections::HashSet<String>| {
for entry in entries {
if (relation.is_none() || relation == Some(entry.relation.as_str()))
&& seen.insert(entry.peer.clone())
{
peers.push(entry.peer);
}
}
};
match direction {
Direction::Outgoing => {
add_entries(self.adjacency.outgoing(entity_id), &mut peers, &mut seen);
}
Direction::Incoming => {
add_entries(
self.reverse_index.incoming(entity_id),
&mut peers,
&mut seen,
);
}
Direction::Both => {
add_entries(self.adjacency.outgoing(entity_id), &mut peers, &mut seen);
add_entries(
self.reverse_index.incoming(entity_id),
&mut peers,
&mut seen,
);
}
}
peers
}
}
pub struct ConversationScope<'a> {
prime: &'a Prime,
conversation_id: &'a str,
}
impl ConversationScope<'_> {
fn metadata(&self) -> serde_json::Value {
json!({ "conversation_id": self.conversation_id })
}
pub async fn add_node(
&self,
node_type: &str,
properties: serde_json::Value,
) -> PrimeResult<NodeId> {
let id = uuid::Uuid::new_v4().to_string();
#[allow(deprecated)]
let entity_id = node_entity_id(node_type, &id);
self.prime
.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: event_types::NODE_CREATED,
payload: json!({
"id": id,
"node_type": node_type,
"properties": properties,
}),
metadata: Some(self.metadata()),
tenant_id: None,
})
.await?;
Ok(NodeId::new(id))
}
pub async fn add_edge(
&self,
source: &str,
target: &str,
relation: &str,
properties: Option<serde_json::Value>,
) -> PrimeResult<EdgeId> {
if !self.prime.node_state.is_live(source) {
return Err(PrimeError::NodeNotFound(source.to_string()));
}
if !self.prime.node_state.is_live(target) {
return Err(PrimeError::NodeNotFound(target.to_string()));
}
let id = uuid::Uuid::new_v4().to_string();
#[allow(deprecated)]
let entity_id = edge_entity_id(&id);
let mut payload = json!({
"id": id,
"source": source,
"target": target,
"relation": relation,
});
if let Some(props) = properties {
payload["properties"] = props;
}
self.prime
.core
.ingest(IngestEvent {
entity_id: &entity_id,
event_type: event_types::EDGE_CREATED,
payload,
metadata: Some(self.metadata()),
tenant_id: None,
})
.await?;
Ok(EdgeId::new(id))
}
pub async fn update_node(
&self,
entity_id: &str,
properties: serde_json::Value,
) -> PrimeResult<()> {
if !self.prime.node_state.is_live(entity_id) {
return Err(PrimeError::NodeNotFound(entity_id.to_string()));
}
self.prime
.core
.ingest(IngestEvent {
entity_id,
event_type: event_types::NODE_UPDATED,
payload: json!({ "properties": properties }),
metadata: Some(self.metadata()),
tenant_id: None,
})
.await?;
Ok(())
}
pub async fn delete_node(&self, entity_id: &str) -> PrimeResult<()> {
self.prime.delete_node(entity_id).await
}
pub fn get_node(&self, entity_id: &str) -> Option<Node> {
self.prime.get_node(entity_id)
}
pub fn neighbors(
&self,
entity_id: &str,
relation: Option<&str>,
direction: Direction,
) -> Vec<Node> {
self.prime.neighbors(entity_id, relation, direction)
}
pub fn stats(&self) -> PrimeStats {
self.prime.stats()
}
pub fn conversation_id(&self) -> &str {
self.conversation_id
}
}
#[cfg(feature = "prime-vectors")]
fn mmr_rerank(nodes: &mut Vec<super::types::ScoredNode>, top_k: usize, lambda: f64) {
use super::types::ScoredNode;
if nodes.len() <= top_k {
return;
}
let mut selected: Vec<ScoredNode> = Vec::with_capacity(top_k);
let mut remaining: Vec<ScoredNode> = std::mem::take(nodes);
while selected.len() < top_k && !remaining.is_empty() {
let mut best_idx = 0;
let mut best_mmr = f64::NEG_INFINITY;
for (i, candidate) in remaining.iter().enumerate() {
let relevance = candidate.score;
let max_redundancy = if selected.is_empty() {
0.0
} else {
selected
.iter()
.map(|s| {
if s.node.node_type == candidate.node.node_type
&& s.node.domain == candidate.node.domain
{
0.8
} else if s.node.domain == candidate.node.domain {
0.5
} else {
0.0
}
})
.fold(0.0f64, f64::max)
};
let mmr = lambda * relevance - (1.0 - lambda) * max_redundancy;
if mmr > best_mmr {
best_mmr = mmr;
best_idx = i;
}
}
selected.push(remaining.remove(best_idx));
}
*nodes = selected;
}
fn edge_from_event(event: &crate::embedded::EventView) -> Option<Edge> {
let payload = &event.payload;
let id = payload.get("id")?.as_str()?.to_string();
let source = payload.get("source")?.as_str()?.to_string();
let target = payload.get("target")?.as_str()?.to_string();
let relation = payload.get("relation")?.as_str()?.to_string();
let properties = payload.get("properties").cloned();
let weight = payload.get("weight").and_then(serde_json::Value::as_f64);
Some(Edge {
id: EdgeId::new(id),
source: NodeId::new(source),
target: NodeId::new(target),
relation,
properties,
weight,
deleted: false,
created_at: event.timestamp,
})
}
fn recency_score(
timestamp: chrono::DateTime<chrono::Utc>,
now: chrono::DateTime<chrono::Utc>,
) -> f64 {
let hours_ago = (now - timestamp).num_seconds().max(0) as f64 / 3600.0;
(-0.01 * hours_ago).exp()
}
fn node_from_state(entity_id: &str, state: &serde_json::Value) -> Option<Node> {
use super::types::NodeId;
use chrono::{DateTime, Utc};
let id = entity_id.split(':').nth(2).unwrap_or(entity_id).to_string();
let node_type = state.get("node_type")?.as_str()?.to_string();
let properties = state.get("properties").cloned().unwrap_or(json!({}));
let domain = state
.get("domain")
.and_then(|v| v.as_str())
.map(String::from);
let labels = state
.get("labels")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let deleted = state
.get("deleted")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let created_at: DateTime<Utc> = state
.get("created_at")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.unwrap_or_else(Utc::now);
let updated_at: DateTime<Utc> = state
.get("updated_at")
.and_then(|v| v.as_str())
.and_then(|s| s.parse().ok())
.unwrap_or_else(Utc::now);
Some(Node {
id: NodeId::new(id),
node_type,
properties,
domain,
labels,
deleted,
created_at,
updated_at,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_open_in_memory_and_shutdown() {
let prime = Prime::open_in_memory().await.unwrap();
let stats = prime.stats();
assert_eq!(stats.total_nodes, 0);
assert_eq!(stats.total_edges, 0);
assert_eq!(stats.event_count, 0);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_open_with_path_and_ingest() {
let dir = tempfile::tempdir().unwrap();
let prime = Prime::open(dir.path()).await.unwrap();
let _id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let stats = prime.stats();
assert_eq!(stats.total_nodes, 1);
assert_eq!(stats.event_count, 1);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_persistence_across_reopens() {
let dir = tempfile::tempdir().unwrap();
{
let prime = Prime::open(dir.path()).await.unwrap();
prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
prime.shutdown().await.unwrap();
}
{
let prime = Prime::open(dir.path()).await.unwrap();
let events = prime
.core()
.query(Query::new().event_type(event_types::NODE_CREATED))
.await
.unwrap();
assert_eq!(events.len(), 1);
prime.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn test_stats_counts_nodes_and_edges() {
let prime = Prime::open_in_memory().await.unwrap();
prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
prime
.core()
.ingest(IngestEvent {
entity_id: "edge:e-1",
event_type: event_types::EDGE_CREATED,
payload: json!({
"id": "e-1",
"source": "alice",
"target": "bob",
"relation": "knows",
}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
let stats = prime.stats();
assert_eq!(stats.total_nodes, 2);
assert_eq!(stats.total_edges, 1);
assert_eq!(stats.event_count, 3);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_full_node_crud_lifecycle() {
let prime = Prime::open_in_memory().await.unwrap();
let id = prime
.add_node("person", json!({"name": "Alice", "age": 30}))
.await
.unwrap();
let entity_id = node_entity_id("person", id.as_str());
let node = prime.get_node(&entity_id).unwrap();
assert_eq!(node.node_type, "person");
assert_eq!(node.properties["name"], "Alice");
assert_eq!(node.properties["age"], 30);
prime
.update_node(&entity_id, json!({"role": "engineer", "age": 31}))
.await
.unwrap();
let node = prime.get_node(&entity_id).unwrap();
assert_eq!(node.properties["name"], "Alice"); assert_eq!(node.properties["role"], "engineer"); assert_eq!(node.properties["age"], 31);
prime.delete_node(&entity_id).await.unwrap();
assert!(prime.get_node(&entity_id).is_none());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_delete_node_cascades_edges() {
let prime = Prime::open_in_memory().await.unwrap();
let alice_id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let bob_id = prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
let alice_entity = node_entity_id("person", alice_id.as_str());
let bob_entity = node_entity_id("person", bob_id.as_str());
prime
.core()
.ingest(IngestEvent {
entity_id: "edge:e-1",
event_type: event_types::EDGE_CREATED,
payload: json!({
"id": "e-1",
"source": alice_entity,
"target": bob_entity,
"relation": "knows",
}),
metadata: None,
tenant_id: None,
})
.await
.unwrap();
assert_eq!(prime.adjacency.outgoing(&alice_entity).len(), 1);
prime.delete_node(&alice_entity).await.unwrap();
assert_eq!(prime.adjacency.outgoing(&alice_entity).len(), 0);
assert_eq!(prime.reverse_index.incoming(&bob_entity).len(), 0);
assert!(prime.get_node(&alice_entity).is_none());
assert!(prime.get_node(&bob_entity).is_some());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_update_nonexistent_node_returns_error() {
let prime = Prime::open_in_memory().await.unwrap();
let result = prime
.update_node("node:person:ghost", json!({"name": "Ghost"}))
.await;
assert!(result.is_err());
match result.unwrap_err() {
PrimeError::NodeNotFound(id) => assert_eq!(id, "node:person:ghost"),
other => panic!("expected NodeNotFound, got: {other}"),
}
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_nodes_by_type() {
let prime = Prime::open_in_memory().await.unwrap();
prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
prime
.add_node("project", json!({"name": "Prime"}))
.await
.unwrap();
let persons = prime.nodes_by_type("person");
assert_eq!(persons.len(), 2);
let projects = prime.nodes_by_type("project");
assert_eq!(projects.len(), 1);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_create_edge_between_nodes() {
let prime = Prime::open_in_memory().await.unwrap();
let alice_id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let bob_id = prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
let alice_entity = node_entity_id("person", alice_id.as_str());
let bob_entity = node_entity_id("person", bob_id.as_str());
let edge_id = prime
.add_edge(&alice_entity, &bob_entity, "knows", None)
.await
.unwrap();
let edge = prime.get_edge(edge_id.as_str()).await.unwrap().unwrap();
assert_eq!(edge.relation, "knows");
assert_eq!(edge.source.as_str(), &alice_entity);
assert_eq!(edge.target.as_str(), &bob_entity);
assert_eq!(prime.adjacency.outgoing(&alice_entity).len(), 1);
assert_eq!(prime.reverse_index.incoming(&bob_entity).len(), 1);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_create_edge_to_nonexistent_node() {
let prime = Prime::open_in_memory().await.unwrap();
let alice_id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let alice_entity = node_entity_id("person", alice_id.as_str());
let result = prime
.add_edge(&alice_entity, "node:person:ghost", "knows", None)
.await;
assert!(result.is_err());
match result.unwrap_err() {
PrimeError::NodeNotFound(id) => assert_eq!(id, "node:person:ghost"),
other => panic!("expected NodeNotFound, got: {other}"),
}
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_delete_edge() {
let prime = Prime::open_in_memory().await.unwrap();
let alice_id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let bob_id = prime
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
let alice_entity = node_entity_id("person", alice_id.as_str());
let bob_entity = node_entity_id("person", bob_id.as_str());
let edge_id = prime
.add_edge(&alice_entity, &bob_entity, "knows", None)
.await
.unwrap();
prime.delete_edge(edge_id.as_str()).await.unwrap();
let edge = prime.get_edge(edge_id.as_str()).await.unwrap();
assert!(edge.is_none());
assert_eq!(prime.adjacency.outgoing(&alice_entity).len(), 0);
assert_eq!(prime.reverse_index.incoming(&bob_entity).len(), 0);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_weighted_edge() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime
.add_node("person", json!({"name": "A"}))
.await
.unwrap();
let b = prime
.add_node("person", json!({"name": "B"}))
.await
.unwrap();
let a_entity = node_entity_id("person", a.as_str());
let b_entity = node_entity_id("person", b.as_str());
let edge_id = prime
.add_edge_weighted(&a_entity, &b_entity, "trust", 0.95, None)
.await
.unwrap();
let edge = prime.get_edge(edge_id.as_str()).await.unwrap().unwrap();
assert_eq!(edge.weight, Some(0.95));
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_neighbors_all_directions() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime
.add_node("person", json!({"name": "A"}))
.await
.unwrap();
let b = prime
.add_node("person", json!({"name": "B"}))
.await
.unwrap();
let c = prime
.add_node("person", json!({"name": "C"}))
.await
.unwrap();
let d = prime
.add_node("person", json!({"name": "D"}))
.await
.unwrap();
let a_e = node_entity_id("person", a.as_str());
let b_e = node_entity_id("person", b.as_str());
let c_e = node_entity_id("person", c.as_str());
let d_e = node_entity_id("person", d.as_str());
prime.add_edge(&a_e, &b_e, "works_on", None).await.unwrap();
prime.add_edge(&a_e, &c_e, "knows", None).await.unwrap();
prime.add_edge(&d_e, &a_e, "manages", None).await.unwrap();
let out = prime.neighbors(&a_e, None, Direction::Outgoing);
assert_eq!(out.len(), 2);
let out_filtered = prime.neighbors(&a_e, Some("works_on"), Direction::Outgoing);
assert_eq!(out_filtered.len(), 1);
assert_eq!(out_filtered[0].properties["name"], "B");
let inc = prime.neighbors(&a_e, None, Direction::Incoming);
assert_eq!(inc.len(), 1);
assert_eq!(inc[0].properties["name"], "D");
let both = prime.neighbors(&a_e, None, Direction::Both);
assert_eq!(both.len(), 3);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_bfs_diamond_graph() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let d = prime.add_node("n", json!({"name": "D"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
let ce = node_entity_id("n", c.as_str());
let de = node_entity_id("n", d.as_str());
prime.add_edge(&ae, &be, "link", None).await.unwrap();
prime.add_edge(&ae, &ce, "link", None).await.unwrap();
prime.add_edge(&be, &de, "link", None).await.unwrap();
prime.add_edge(&ce, &de, "link", None).await.unwrap();
let results = prime.neighbors_within(&ae, 2, None, Direction::Outgoing);
assert_eq!(results.len(), 4);
assert_eq!(results[0].1, 0); let depth_1: Vec<_> = results.iter().filter(|(_, d)| *d == 1).collect();
assert_eq!(depth_1.len(), 2);
let depth_2: Vec<_> = results.iter().filter(|(_, d)| *d == 2).collect();
assert_eq!(depth_2.len(), 1);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_subgraph_extraction() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
let ce = node_entity_id("n", c.as_str());
prime.add_edge(&ae, &be, "link", None).await.unwrap();
prime.add_edge(&ae, &ce, "link", None).await.unwrap();
let sg = prime.subgraph(&ae, 1);
assert_eq!(sg.nodes.len(), 3); assert_eq!(sg.edges.len(), 2);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_bfs_cycle_terminates() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
let ce = node_entity_id("n", c.as_str());
prime.add_edge(&ae, &be, "link", None).await.unwrap();
prime.add_edge(&be, &ce, "link", None).await.unwrap();
prime.add_edge(&ce, &ae, "link", None).await.unwrap();
let results = prime.neighbors_within(&ae, 10, None, Direction::Outgoing);
assert_eq!(results.len(), 3);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_shortest_path_bfs() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let d = prime.add_node("n", json!({"name": "D"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
let ce = node_entity_id("n", c.as_str());
let de = node_entity_id("n", d.as_str());
prime.add_edge(&ae, &be, "link", None).await.unwrap();
prime.add_edge(&be, &ce, "link", None).await.unwrap();
prime.add_edge(&ce, &de, "link", None).await.unwrap();
let path = prime.shortest_path(&ae, &de, None).unwrap();
assert_eq!(path.len(), 4);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_shortest_path_weighted_dijkstra() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let d = prime.add_node("n", json!({"name": "D"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
let ce = node_entity_id("n", c.as_str());
let de = node_entity_id("n", d.as_str());
prime
.add_edge_weighted(&ae, &be, "link", 1.0, None)
.await
.unwrap();
prime
.add_edge_weighted(&ae, &ce, "link", 3.0, None)
.await
.unwrap();
prime
.add_edge_weighted(&be, &de, "link", 1.0, None)
.await
.unwrap();
prime
.add_edge_weighted(&ce, &de, "link", 1.0, None)
.await
.unwrap();
let (path, cost) = prime.shortest_path_weighted(&ae, &de, None).unwrap();
assert_eq!(path.len(), 3); assert!((cost - 2.0).abs() < f64::EPSILON);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_shortest_path_disconnected() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
assert!(prime.shortest_path(&ae, &be, None).is_none());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_history_full_lifecycle() {
let prime = Prime::open_in_memory().await.unwrap();
let id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let entity_id = node_entity_id("person", id.as_str());
prime
.update_node(&entity_id, json!({"role": "engineer"}))
.await
.unwrap();
prime
.update_node(&entity_id, json!({"level": "senior"}))
.await
.unwrap();
prime.delete_node(&entity_id).await.unwrap();
let history = prime.history(&entity_id).await.unwrap();
assert_eq!(history.len(), 4); assert_eq!(history[0].event_type, event_types::NODE_CREATED);
assert_eq!(history[1].event_type, event_types::NODE_UPDATED);
assert_eq!(history[2].event_type, event_types::NODE_UPDATED);
assert_eq!(history[3].event_type, event_types::NODE_DELETED);
for window in history.windows(2) {
assert!(window[0].timestamp <= window[1].timestamp);
}
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_history_nonexistent_entity_returns_empty() {
let prime = Prime::open_in_memory().await.unwrap();
let history = prime.history("node:person:ghost").await.unwrap();
assert!(history.is_empty());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_history_edge_events() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let a_e = node_entity_id("n", a.as_str());
let b_e = node_entity_id("n", b.as_str());
let edge_id = prime.add_edge(&a_e, &b_e, "knows", None).await.unwrap();
let edge_entity = edge_entity_id(edge_id.as_str());
prime.delete_edge(edge_id.as_str()).await.unwrap();
let history = prime.history(&edge_entity).await.unwrap();
assert_eq!(history.len(), 2); assert_eq!(history[0].event_type, event_types::EDGE_CREATED);
assert_eq!(history[1].event_type, event_types::EDGE_DELETED);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_diff_between_timestamps() {
use chrono::Utc;
let prime = Prime::open_in_memory().await.unwrap();
let t1 = Utc::now();
let a = prime
.add_node("person", json!({"name": "A"}))
.await
.unwrap();
let b = prime
.add_node("person", json!({"name": "B"}))
.await
.unwrap();
let c = prime
.add_node("project", json!({"name": "C"}))
.await
.unwrap();
let a_e = node_entity_id("person", a.as_str());
let b_e = node_entity_id("person", b.as_str());
prime.add_edge(&a_e, &b_e, "knows", None).await.unwrap();
prime
.add_edge(
&a_e,
&node_entity_id("project", c.as_str()),
"works_on",
None,
)
.await
.unwrap();
let t2 = Utc::now();
let diff = prime.diff(t1, t2).await.unwrap();
assert_eq!(diff.nodes_added.len(), 3);
assert_eq!(diff.edges_added.len(), 2);
assert!(diff.nodes_deleted.is_empty());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_timeline_with_range() {
use chrono::Utc;
let prime = Prime::open_in_memory().await.unwrap();
let id = prime
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
let entity_id = node_entity_id("person", id.as_str());
let t_mid = Utc::now();
prime
.update_node(&entity_id, json!({"role": "engineer"}))
.await
.unwrap();
let t_end = Utc::now();
let full = prime.timeline(&entity_id, None, None).await.unwrap();
assert_eq!(full.len(), 2);
let after_mid = prime
.timeline(&entity_id, Some(t_mid), Some(t_end))
.await
.unwrap();
assert_eq!(after_mid.len(), 1);
assert_eq!(after_mid[0].event_type, event_types::NODE_UPDATED);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_shortest_path_to_self() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let path = prime.shortest_path(&ae, &ae, None).unwrap();
assert_eq!(path.len(), 1);
assert_eq!(path[0].properties["name"], "A");
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_neighbors_as_of() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
prime.add_edge(&ae, &be, "knows", None).await.unwrap();
let t2 = chrono::Utc::now();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let c = prime.add_node("n", json!({"name": "C"})).await.unwrap();
let ce = node_entity_id("n", c.as_str());
prime.add_edge(&ae, &ce, "knows", None).await.unwrap();
let neighbors_at_t2 = prime.neighbors_as_of(&ae, None, t2).await.unwrap();
assert_eq!(neighbors_at_t2.len(), 1);
assert_eq!(neighbors_at_t2[0].properties["name"], "B");
let neighbors_now = prime
.neighbors_as_of(&ae, None, chrono::Utc::now())
.await
.unwrap();
assert_eq!(neighbors_now.len(), 2);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_get_node_as_of_deleted() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let t1 = chrono::Utc::now();
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
prime.delete_node(&ae).await.unwrap();
let node_at_t1 = prime.get_node_as_of(&ae, t1).await.unwrap();
assert!(node_at_t1.is_some());
assert_eq!(node_at_t1.unwrap().properties["name"], "A");
let node_now = prime.get_node_as_of(&ae, chrono::Utc::now()).await.unwrap();
assert!(node_now.is_none());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_compact_merges_properties() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime
.add_node("person", json!({"name": "Alice", "age": 30}))
.await
.unwrap();
let b = prime
.add_node(
"person",
json!({"name": "Alice Smith", "email": "alice@example.com"}),
)
.await
.unwrap();
let a_e = node_entity_id("person", a.as_str());
let b_e = node_entity_id("person", b.as_str());
prime.compact(&a_e, &[&b_e]).await.unwrap();
let node = prime.get_node(&a_e).unwrap();
assert_eq!(node.properties["name"], "Alice"); assert_eq!(node.properties["age"], 30); assert_eq!(node.properties["email"], "alice@example.com");
assert!(prime.get_node(&b_e).is_none());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_compact_redirects_edges() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime
.add_node("person", json!({"name": "A"}))
.await
.unwrap();
let b = prime
.add_node("person", json!({"name": "B"}))
.await
.unwrap();
let c = prime
.add_node("project", json!({"name": "C"}))
.await
.unwrap();
let a_e = node_entity_id("person", a.as_str());
let b_e = node_entity_id("person", b.as_str());
let c_e = node_entity_id("project", c.as_str());
prime.add_edge(&b_e, &c_e, "works_on", None).await.unwrap();
prime.compact(&a_e, &[&b_e]).await.unwrap();
let neighbors = prime.neighbors(&a_e, Some("works_on"), Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].properties["name"], "C");
assert!(prime.get_node(&b_e).is_none());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_compact_emits_compacted_event() {
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let a_e = node_entity_id("n", a.as_str());
let b_e = node_entity_id("n", b.as_str());
prime.compact(&a_e, &[&b_e]).await.unwrap();
let history = prime.history(&a_e).await.unwrap();
let compacted = history
.iter()
.find(|h| h.event_type == "prime.memory.compacted");
assert!(compacted.is_some());
let payload = &compacted.unwrap().payload;
assert_eq!(payload["target"], a_e);
assert!(
payload["merged_from"]
.as_array()
.unwrap()
.contains(&json!(b_e))
);
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_compact_nonexistent_target_fails() {
let prime = Prime::open_in_memory().await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let b_e = node_entity_id("n", b.as_str());
let result = prime.compact("node:n:ghost", &[&b_e]).await;
assert!(result.is_err());
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_embed_and_vector_search() {
let prime = Prime::open_in_memory().await.unwrap();
prime
.embed("doc-1", Some("Rust is fast"), vec![1.0, 0.0, 0.0, 0.0])
.await
.unwrap();
prime
.embed("doc-2", Some("Rust is safe"), vec![0.9, 0.1, 0.0, 0.0])
.await
.unwrap();
prime
.embed("doc-3", Some("Python is dynamic"), vec![0.0, 0.0, 1.0, 0.0])
.await
.unwrap();
let results = prime.vector_search(&[1.0, 0.0, 0.0, 0.0], 3);
assert_eq!(results.len(), 3);
assert!(results[0].id.contains("doc-1"));
assert!(results[0].score > 0.99);
assert!(results[2].id.contains("doc-3"));
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_embed_delete_excludes_from_search() {
let prime = Prime::open_in_memory().await.unwrap();
prime
.embed("doc-a", Some("Hello"), vec![1.0, 0.0])
.await
.unwrap();
prime
.embed("doc-b", Some("World"), vec![0.9, 0.1])
.await
.unwrap();
prime.delete_vector("doc-a").await.unwrap();
let results = prime.vector_search(&[1.0, 0.0], 10);
assert_eq!(results.len(), 1);
assert!(results[0].id.contains("doc-b"));
assert!(prime.get_vector("doc-a").is_none());
assert!(prime.get_vector("doc-b").is_some());
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_similar_finds_close_vectors() {
let prime = Prime::open_in_memory().await.unwrap();
prime
.embed("ref", Some("Reference"), vec![1.0, 0.0, 0.0])
.await
.unwrap();
prime
.embed("close", Some("Close"), vec![0.95, 0.05, 0.0])
.await
.unwrap();
prime
.embed("far", Some("Far"), vec![0.0, 0.0, 1.0])
.await
.unwrap();
let similar = prime.similar("ref", 3).unwrap();
assert_eq!(similar.len(), 3);
assert!(similar[0].id.contains("ref"));
assert!(similar[1].id.contains("close"));
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_recall_vector_plus_graph() {
use crate::prime::types::RecallQuery;
let prime = Prime::open_in_memory().await.unwrap();
let a = prime
.add_node("concept", json!({"name": "Rust"}))
.await
.unwrap();
let b = prime
.add_node("concept", json!({"name": "Safety"}))
.await
.unwrap();
let c = prime
.add_node("concept", json!({"name": "Speed"}))
.await
.unwrap();
let ae = node_entity_id("concept", a.as_str());
let be = node_entity_id("concept", b.as_str());
let ce = node_entity_id("concept", c.as_str());
prime.add_edge(&ae, &be, "related", None).await.unwrap();
prime.add_edge(&ae, &ce, "related", None).await.unwrap();
prime
.embed(&ae, Some("Rust"), vec![1.0, 0.0, 0.0, 0.0])
.await
.unwrap();
prime
.embed(&be, Some("Safety"), vec![0.8, 0.2, 0.0, 0.0])
.await
.unwrap();
prime
.embed(&ce, Some("Speed"), vec![0.0, 0.0, 1.0, 0.0])
.await
.unwrap();
let result = prime
.recall(RecallQuery {
vector: Some(vec![1.0, 0.0, 0.0, 0.0]),
depth: 1,
top_k: 10,
similarity_weight: 0.5,
proximity_weight: 0.3,
recency_weight: 0.2,
..Default::default()
})
.await
.unwrap();
assert!(!result.nodes.is_empty());
assert!(!result.vectors.is_empty());
assert!(result.nodes[0].score > 0.0);
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_recall_recency_weight() {
use crate::prime::types::RecallQuery;
let prime = Prime::open_in_memory().await.unwrap();
prime
.add_node("fact", json!({"name": "New"}))
.await
.unwrap();
prime
.add_node("fact", json!({"name": "Also New"}))
.await
.unwrap();
let result = prime
.recall(RecallQuery {
node_type: Some("fact".to_string()),
recency_weight: 1.0,
similarity_weight: 0.0,
proximity_weight: 0.0,
top_k: 10,
..Default::default()
})
.await
.unwrap();
assert_eq!(result.nodes.len(), 2);
for node in &result.nodes {
assert!(node.components.recency > 0.99);
}
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_recall_depth_zero_only_direct_matches() {
use crate::prime::types::RecallQuery;
let prime = Prime::open_in_memory().await.unwrap();
let a = prime.add_node("n", json!({"name": "A"})).await.unwrap();
let b = prime.add_node("n", json!({"name": "B"})).await.unwrap();
let ae = node_entity_id("n", a.as_str());
let be = node_entity_id("n", b.as_str());
prime.add_edge(&ae, &be, "link", None).await.unwrap();
prime.embed(&ae, Some("A"), vec![1.0, 0.0]).await.unwrap();
let result = prime
.recall(RecallQuery {
vector: Some(vec![1.0, 0.0]),
depth: 0, top_k: 10,
..Default::default()
})
.await
.unwrap();
assert_eq!(result.nodes.len(), 1);
assert_eq!(result.nodes[0].depth, 0);
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_remember_creates_node_vector_edges() {
let prime = Prime::open_in_memory().await.unwrap();
let project = prime
.add_node("project", json!({"name": "Prime"}))
.await
.unwrap();
let project_entity = node_entity_id("project", project.as_str());
let entity_id = prime
.remember(
"Rust is a systems language",
vec![1.0, 0.0, 0.0],
"concept",
json!({"name": "Rust"}),
&[(&project_entity, "related_to")],
)
.await
.unwrap();
let node = prime.get_node(&entity_id).unwrap();
assert_eq!(node.node_type, "concept");
assert_eq!(node.properties["name"], "Rust");
let results = prime.vector_search(&[1.0, 0.0, 0.0], 5);
assert!(!results.is_empty());
let neighbors = prime.neighbors(&entity_id, Some("related_to"), Direction::Outgoing);
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].properties["name"], "Prime");
prime.shutdown().await.unwrap();
}
#[cfg(feature = "prime-vectors")]
#[tokio::test]
async fn test_forget_removes_node_vector_edges() {
let prime = Prime::open_in_memory().await.unwrap();
let entity_id = prime
.remember(
"Ephemeral knowledge",
vec![0.5, 0.5],
"fact",
json!({"name": "temp"}),
&[],
)
.await
.unwrap();
assert!(prime.get_node(&entity_id).is_some());
assert!(!prime.vector_search(&[0.5, 0.5], 5).is_empty());
prime.forget(&entity_id).await.unwrap();
assert!(prime.get_node(&entity_id).is_none());
let results = prime.vector_search(&[0.5, 0.5], 5);
assert!(results.is_empty());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_conversation_scoping_isolates_history() {
let prime = Prime::open_in_memory().await.unwrap();
let conv1 = prime.with_conversation("conv-1");
let a = conv1
.add_node("person", json!({"name": "Alice"}))
.await
.unwrap();
#[allow(deprecated)]
let ae = node_entity_id("person", a.as_str());
let conv2 = prime.with_conversation("conv-2");
let b = conv2
.add_node("person", json!({"name": "Bob"}))
.await
.unwrap();
#[allow(deprecated)]
let _be = node_entity_id("person", b.as_str());
let h1 = prime.conversation_history("conv-1").await.unwrap();
assert_eq!(h1.len(), 1);
let h2 = prime.conversation_history("conv-2").await.unwrap();
assert_eq!(h2.len(), 1);
assert!(prime.get_node(&ae).is_some());
prime.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_conversation_diff() {
let prime = Prime::open_in_memory().await.unwrap();
let conv = prime.with_conversation("conv-x");
conv.add_node("concept", json!({"name": "Rust"}))
.await
.unwrap();
conv.add_node("concept", json!({"name": "Safety"}))
.await
.unwrap();
let diff = prime.conversation_diff("conv-x").await.unwrap();
assert_eq!(diff.nodes_added.len(), 2);
prime.shutdown().await.unwrap();
}
}