use std::num::NonZeroUsize;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::{AtomicBool, Ordering};
use hirn_core::content::MemoryContent;
use hirn_core::embed::Embedder;
use hirn_core::episodic::EpisodicRecord;
use hirn_core::metadata::MetadataValue;
use hirn_core::record::MemoryRecord;
use hirn_core::timestamp::Timestamp;
use hirn_core::types::{AgentId, EdgeRelation, Namespace};
use hirn_core::{HirnConfig, MemoryId};
use hirn_engine::GraphStore;
use hirn_engine::HirnDB;
use hirn_provider::PseudoEmbedder;
use hirn_storage::{HirnDb, HirnDbConfig, PhysicalStore};
use tokio::time::{Duration, timeout};
struct TrackingEmbedder {
inner: PseudoEmbedder,
call_count: std::sync::atomic::AtomicUsize,
}
impl TrackingEmbedder {
fn new(dims: usize) -> Self {
Self {
inner: PseudoEmbedder::new(dims),
call_count: std::sync::atomic::AtomicUsize::new(0),
}
}
fn call_count(&self) -> usize {
self.call_count.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[async_trait::async_trait]
impl hirn_core::embed::Embedder for TrackingEmbedder {
async fn embed(
&self,
texts: &[&str],
) -> hirn_core::HirnResult<Vec<hirn_core::embed::Embedding>> {
self.call_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.inner.embed(texts).await
}
fn dimensions(&self) -> usize {
self.inner.dimensions()
}
fn model_id(&self) -> &str {
"tracking-pseudo"
}
fn max_input_tokens(&self) -> usize {
usize::MAX
}
}
struct BlockingEmbedder {
inner: PseudoEmbedder,
blocked_once: AtomicBool,
entered_tx: StdMutex<Option<tokio::sync::oneshot::Sender<()>>>,
release: tokio::sync::Notify,
}
impl BlockingEmbedder {
fn new(dims: usize) -> (Self, tokio::sync::oneshot::Receiver<()>) {
let (entered_tx, entered_rx) = tokio::sync::oneshot::channel();
(
Self {
inner: PseudoEmbedder::new(dims),
blocked_once: AtomicBool::new(false),
entered_tx: StdMutex::new(Some(entered_tx)),
release: tokio::sync::Notify::new(),
},
entered_rx,
)
}
fn release(&self) {
self.release.notify_waiters();
}
}
#[async_trait::async_trait]
impl hirn_core::embed::Embedder for BlockingEmbedder {
async fn embed(
&self,
texts: &[&str],
) -> hirn_core::HirnResult<Vec<hirn_core::embed::Embedding>> {
if !self.blocked_once.swap(true, Ordering::SeqCst) {
let entered_tx = self
.entered_tx
.lock()
.expect("entered lock poisoned")
.take();
if let Some(tx) = entered_tx {
let _ = tx.send(());
}
self.release.notified().await;
}
self.inner.embed(texts).await
}
fn dimensions(&self) -> usize {
self.inner.dimensions()
}
fn model_id(&self) -> &str {
"blocking-pseudo"
}
fn max_input_tokens(&self) -> usize {
usize::MAX
}
}
const DIM: usize = 32;
fn agent() -> AgentId {
AgentId::new("test_agent").unwrap()
}
fn rand_vec(seed: u128) -> Vec<f32> {
(0..DIM)
.map(|i| (seed as f64).mul_add(0.618_033, i as f64 * 0.414_213).sin() as f32)
.collect()
}
fn axis_vec(offset: f32) -> Vec<f32> {
let mut embedding = vec![0.0; DIM];
embedding[0] = offset;
embedding
}
fn basis_vec(dim: usize) -> Vec<f32> {
let mut embedding = vec![0.0f32; DIM];
embedding[dim] = 1.0;
embedding
}
fn episode_in(
namespace: Namespace,
content: impl Into<String>,
embedding: Vec<f32>,
) -> EpisodicRecord {
EpisodicRecord::builder()
.content(content)
.agent_id(agent())
.namespace(namespace)
.embedding(embedding)
.build()
.unwrap()
}
fn episode_at(
namespace: Namespace,
content: impl Into<String>,
embedding: Vec<f32>,
timestamp: Timestamp,
) -> EpisodicRecord {
EpisodicRecord::builder()
.content(content)
.agent_id(agent())
.namespace(namespace)
.embedding(embedding)
.timestamp(timestamp)
.build()
.unwrap()
}
async fn episodic_importance(db: &HirnDB, query: Vec<f32>, id: MemoryId) -> f32 {
let recalled = db
.recall_view()
.query(query)
.limit(64)
.execute()
.await
.unwrap();
let found = recalled
.iter()
.find(|result| result.record.id() == id)
.unwrap();
match &found.record {
MemoryRecord::Episodic(record) => record.importance,
_ => panic!("Expected episodic record"),
}
}
async fn rpe_db() -> (HirnDB, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("rpe_test");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.3)
.rpe_similarity_search_limit(5)
.build()
.unwrap();
let db = HirnDB::open_with_config(config, backend).await.unwrap();
(db, dir)
}
async fn full_write_path_db() -> (HirnDB, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("wp_test");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.3)
.rpe_similarity_search_limit(5)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_timeout_secs(5)
.svo_extraction_enabled(true)
.svo_confidence_threshold(0.5)
.build()
.unwrap();
let db = HirnDB::open_with_config(config, backend).await.unwrap();
(db, dir)
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_near_duplicate_gets_fast_path() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
let id1 = db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog again")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
let id2 = db.episodic().remember(r2).await.unwrap();
assert_ne!(id1, id2);
let recalled = db
.recall_view()
.query(emb)
.limit(10)
.execute()
.await
.unwrap();
assert!(recalled.len() >= 2);
let second = recalled.iter().find(|r| r.record.id() == id2).unwrap();
let importance = match &second.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance < 0.5,
"Fast-path importance should be < 0.5, got {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_novel_content_gets_slow_path() {
let (db, _dir) = rpe_db().await;
let r1 = EpisodicRecord::builder()
.content("Machine learning is a subset of artificial intelligence")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let novel_emb = rand_vec(999);
let r2 = EpisodicRecord::builder()
.content("The recipe for chocolate souffle requires precise temperature")
.agent_id(agent())
.embedding(novel_emb)
.build()
.unwrap();
let id2 = db.episodic().remember(r2).await.unwrap();
let recalled = db
.recall_view()
.query(rand_vec(999))
.limit(10)
.execute()
.await
.unwrap();
let novel = recalled.iter().find(|r| r.record.id() == id2).unwrap();
let importance = match &novel.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance >= 0.5,
"Novel content should retain default importance, got {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn prospective_indexing_stores_implications() {
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content(
"The new quantum computing paper demonstrates 1000-qubit entanglement breakthrough",
)
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let _id = db.episodic().remember(r).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn svo_extraction_writes_events() {
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content("Alice deployed version 2.3 on 2026-03-15. Bob reviewed the pull request.")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let _id = db.episodic().remember(r).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn svo_extraction_no_events_for_non_temporal() {
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content("General knowledge about neural networks")
.agent_id(agent())
.embedding(rand_vec(100))
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn interference_tracker_triggers_on_high_similarity() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
for i in 0..10 {
let r = EpisodicRecord::builder()
.content(format!(
"The quick brown fox jumps over the lazy dog version {i}"
))
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
}
#[tokio::test(flavor = "multi_thread")]
async fn interference_no_trigger_on_diverse_content() {
let (db, _dir) = rpe_db().await;
for i in 0..10 {
let r = EpisodicRecord::builder()
.content(format!("Unique topic number {i} about different subjects"))
.agent_id(agent())
.embedding(rand_vec(i as u128 * 1000))
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
}
#[tokio::test(flavor = "multi_thread")]
async fn remember_succeeds_without_embedder() {
let (db, _dir) = rpe_db().await;
let r = EpisodicRecord::builder()
.content("Content that needs embedding but embedder is unavailable")
.agent_id(agent())
.build()
.unwrap();
let _id = db.episodic().remember(r).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn remember_with_embedding_bypasses_embed_step() {
let (db, _dir) = rpe_db().await;
let r = EpisodicRecord::builder()
.content("Pre-embedded content")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let _id = db.episodic().remember(r).await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_survives_without_embedder() {
let (db, _dir) = rpe_db().await;
let records = vec![
EpisodicRecord::builder()
.content("Record A with embedding")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap(),
EpisodicRecord::builder()
.content("Record B without embedding")
.agent_id(agent())
.build()
.unwrap(),
];
let results = db.episodic().batch_remember(records).await;
assert!(results[0].is_ok());
assert!(results[1].is_ok());
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_completely_unrelated_gets_high_score() {
let (db, _dir) = rpe_db().await;
let cluster_emb = rand_vec(42);
for i in 0..5 {
let r = EpisodicRecord::builder()
.content(format!("Cluster topic about databases version {i}"))
.agent_id(agent())
.embedding(cluster_emb.clone())
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
let outlier_emb = rand_vec(999_999);
let r = EpisodicRecord::builder()
.content("Completely unrelated content about underwater basket weaving")
.agent_id(agent())
.embedding(outlier_emb.clone())
.build()
.unwrap();
let outlier_id = db.episodic().remember(r).await.unwrap();
let recalled = db
.recall_view()
.query(outlier_emb)
.limit(10)
.execute()
.await
.unwrap();
let outlier = recalled
.iter()
.find(|r| r.record.id() == outlier_id)
.unwrap();
let importance = match &outlier.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance >= 0.5,
"Unrelated content should stay on slow path with default importance, got {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn remember_rejects_wrong_embedding_dimensions() {
let (db, _dir) = rpe_db().await;
let wrong_dim_emb: Vec<f32> = (0..16).map(|i| i as f32 * 0.1).collect();
let r = EpisodicRecord::builder()
.content("Record with wrong embedding dimensions")
.agent_id(agent())
.embedding(wrong_dim_emb)
.build()
.unwrap();
let result = db.episodic().remember(r).await;
assert!(result.is_err(), "Should reject wrong embedding dimensions");
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("dimension mismatch"),
"Error should mention dimension mismatch, got: {err_msg}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_rejects_wrong_embedding_dimensions() {
let (db, _dir) = rpe_db().await;
let wrong_dim_emb: Vec<f32> = (0..16).map(|i| i as f32 * 0.1).collect();
let records = vec![
EpisodicRecord::builder()
.content("Good record")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap(),
EpisodicRecord::builder()
.content("Bad record with wrong dims")
.agent_id(agent())
.embedding(wrong_dim_emb)
.build()
.unwrap(),
];
let results = db.episodic().batch_remember(records).await;
assert!(results[0].is_ok(), "Valid record should succeed");
assert!(results[1].is_err(), "Wrong dims should be rejected");
}
#[tokio::test(flavor = "multi_thread")]
async fn svo_events_written_to_storage() {
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content("Alice deployed version 2.3 on 2026-03-15. Bob reviewed the code.")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
let count = db.storage_backend().count("svo_events", None).await;
match count {
Ok(n) => assert!(n > 0, "SVO events should be written to storage, got {n}"),
Err(_) => {
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn svo_events_written_with_canonical_columns() {
use arrow_array::StringArray;
use hirn_storage::store::ScanOptions;
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content("Alice deployed version 2.4 on 2026-03-15.")
.agent_id(agent())
.embedding(rand_vec(43))
.build()
.unwrap();
let id = db.episodic().remember(r).await.unwrap();
let batches = db
.storage_backend()
.scan("svo_events", ScanOptions::default())
.await
.unwrap();
let batch = batches.iter().find(|batch| batch.num_rows() > 0).unwrap();
assert!(batch.column_by_name("source_memory_id").is_some());
assert!(batch.column_by_name("source_ids_json").is_some());
assert!(batch.column_by_name("time_start").is_some());
assert!(batch.column_by_name("time_start_ms").is_some());
assert!(batch.column_by_name("confidence").is_some());
assert!(batch.column_by_name("namespace").is_some());
let source_col = batch
.column_by_name("source_memory_id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(source_col.value(0), id.to_string());
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_record_has_embedding_and_metadata() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("Initial content about testing")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("Initial content about testing again")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
let id2 = db.episodic().remember(r2).await.unwrap();
let recalled = db
.recall_view()
.query(emb)
.limit(10)
.execute()
.await
.unwrap();
let found = recalled.iter().find(|r| r.record.id() == id2).unwrap();
assert!(found.similarity > 0.0, "Should have non-zero similarity");
match &found.record {
MemoryRecord::Episodic(_) => {
}
_ => panic!("Expected episodic record"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_zscore_amplifies_after_familiar_history() {
let (db, _dir) = rpe_db().await;
let base_emb = rand_vec(42);
for i in 0..8 {
let r = EpisodicRecord::builder()
.content(format!("Familiar topic variation {i}"))
.agent_id(agent())
.embedding(base_emb.clone())
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
let novel_emb = rand_vec(777_777);
let r = EpisodicRecord::builder()
.content("Completely novel content about deep-sea bioluminescence")
.agent_id(agent())
.embedding(novel_emb.clone())
.build()
.unwrap();
let novel_id = db.episodic().remember(r).await.unwrap();
let recalled = db
.recall_view()
.query(novel_emb)
.limit(10)
.execute()
.await
.unwrap();
let novel_rec = recalled.iter().find(|r| r.record.id() == novel_id).unwrap();
let importance = match &novel_rec.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance >= 0.5,
"Novel content after familiar history should be on slow path, got importance {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_empty_db_novel_content_slow_path() {
let (db, _dir) = rpe_db().await;
let r = EpisodicRecord::builder()
.content("First ever memory in the database")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let id = db.episodic().remember(r).await.unwrap();
let recalled = db
.recall_view()
.query(rand_vec(42))
.limit(10)
.execute()
.await
.unwrap();
let found = recalled.iter().find(|r| r.record.id() == id).unwrap();
let importance = match &found.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance >= 0.5,
"First memory in empty DB should be on slow path, got importance {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_mixed_writes_maintain_correct_routing() {
let (db, _dir) = rpe_db().await;
let emb_a = rand_vec(100);
let emb_b = rand_vec(200);
let r1 = EpisodicRecord::builder()
.content("Machine learning fundamentals")
.agent_id(agent())
.embedding(emb_a.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("Ancient Roman architecture")
.agent_id(agent())
.embedding(emb_b.clone())
.build()
.unwrap();
db.episodic().remember(r2).await.unwrap();
let r3 = EpisodicRecord::builder()
.content("Machine learning fundamentals revisited")
.agent_id(agent())
.embedding(emb_a.clone())
.build()
.unwrap();
let id3 = db.episodic().remember(r3).await.unwrap();
let recalled = db
.recall_view()
.query(emb_a)
.limit(10)
.execute()
.await
.unwrap();
let found = recalled.iter().find(|r| r.record.id() == id3).unwrap();
let importance = match &found.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic record"),
};
assert!(
importance < 0.5,
"Near-duplicate should get fast-path low importance, got {importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_namespace_partitions_isolate_novelty_baselines() {
let (db, _dir) = rpe_db().await;
let ns_a = Namespace::new("tenant-a").unwrap();
let ns_b = Namespace::new("tenant-b").unwrap();
db.episodic()
.remember(episode_in(
Namespace::default(),
"shared seed",
axis_vec(0.0),
))
.await
.unwrap();
let familiar_batch = vec![
episode_in(ns_a, "familiar +0.02", axis_vec(0.02)),
episode_in(ns_a, "familiar +0.04", axis_vec(0.04)),
episode_in(ns_a, "familiar +0.06", axis_vec(0.06)),
episode_in(ns_a, "familiar +0.08", axis_vec(0.08)),
episode_in(ns_a, "familiar +0.10", axis_vec(0.10)),
episode_in(ns_a, "familiar +0.12", axis_vec(0.12)),
];
let familiar_results = db.episodic().batch_remember(familiar_batch).await;
assert!(familiar_results.iter().all(|result| result.is_ok()));
let ns_b_emb = axis_vec(0.4);
let ns_b_id = db
.episodic()
.remember(episode_in(
ns_b,
"tenant-b threshold candidate",
ns_b_emb.clone(),
))
.await
.unwrap();
let mut ns_a_emb = vec![0.0f32; DIM];
ns_a_emb[1] = 1.0;
let ns_a_id = db
.episodic()
.remember(episode_in(
ns_a,
"tenant-a threshold candidate",
ns_a_emb.clone(),
))
.await
.unwrap();
let ns_b_importance = episodic_importance(&db, ns_b_emb, ns_b_id).await;
let ns_a_importance = episodic_importance(&db, ns_a_emb, ns_a_id).await;
assert!(
ns_b_importance < 0.5,
"untrained namespace should keep an independent fast-path baseline, got {ns_b_importance}",
);
assert!(
ns_a_importance >= 0.5,
"trained namespace should retain its familiar-history slow path, got {ns_a_importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_concurrent_partitions_update_independently() {
let (db, _dir) = rpe_db().await;
let db = Arc::new(db);
let ns_low = Namespace::new("work-low").unwrap();
let ns_high = Namespace::new("work-high").unwrap();
let db_low = Arc::clone(&db);
let low_handle = tokio::spawn(async move {
db_low
.episodic()
.batch_remember(vec![
episode_in(ns_low, "low 1", basis_vec(0)),
episode_in(ns_low, "low 2", basis_vec(0)),
episode_in(ns_low, "low 3", basis_vec(0)),
episode_in(ns_low, "low 4", basis_vec(0)),
episode_in(ns_low, "low 5", basis_vec(0)),
episode_in(ns_low, "low 6", basis_vec(0)),
])
.await
});
let db_high = Arc::clone(&db);
let high_handle = tokio::spawn(async move {
db_high
.episodic()
.batch_remember(vec![
episode_in(ns_high, "high 1", basis_vec(2)),
episode_in(ns_high, "high 2", basis_vec(2)),
episode_in(ns_high, "high 3", basis_vec(2)),
episode_in(ns_high, "high 4", basis_vec(2)),
])
.await
});
let low_results = low_handle.await.unwrap();
let high_results = high_handle.await.unwrap();
assert!(low_results.iter().all(|result| result.is_ok()));
assert!(high_results.iter().all(|result| result.is_ok()));
let low_id = db
.episodic()
.remember(episode_in(ns_low, "low partition probe", basis_vec(1)))
.await
.unwrap();
let high_id = db
.episodic()
.remember(episode_in(ns_high, "high partition probe", basis_vec(2)))
.await
.unwrap();
let low_importance = episodic_importance(db.as_ref(), basis_vec(1), low_id).await;
let high_importance = episodic_importance(db.as_ref(), basis_vec(2), high_id).await;
assert!(
low_importance >= 0.5,
"low-distance partition should preserve its amplified slow path, got {low_importance}",
);
assert!(
high_importance < 0.5,
"high-distance partition should keep an independent fast-path baseline, got {high_importance}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_rpe_routes_fast_slow_path() {
let (db, _dir) = rpe_db().await;
let seed_emb = basis_vec(0);
let novel_emb = basis_vec(1);
let seed = EpisodicRecord::builder()
.content("Rust is a systems programming language")
.agent_id(agent())
.embedding(seed_emb.clone())
.build()
.unwrap();
db.episodic().remember(seed).await.unwrap();
let dup = EpisodicRecord::builder()
.content("Rust is a systems programming language, again")
.agent_id(agent())
.embedding(seed_emb.clone())
.build()
.unwrap();
let novel = EpisodicRecord::builder()
.content("The diet of emperor penguins varies seasonally")
.agent_id(agent())
.embedding(novel_emb.clone())
.build()
.unwrap();
let results = db.episodic().batch_remember(vec![dup, novel]).await;
assert_eq!(results.len(), 2);
let id_dup = results[0].as_ref().unwrap();
let id_novel = results[1].as_ref().unwrap();
let recalled = db
.recall_view()
.query(seed_emb)
.limit(10)
.execute()
.await
.unwrap();
let found_dup = recalled.iter().find(|r| r.record.id() == *id_dup).unwrap();
let imp_dup = match &found_dup.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic"),
};
assert!(
imp_dup < 0.5,
"Batch fast-path importance should be < 0.5, got {imp_dup}",
);
let recalled_novel = db
.recall_view()
.query(novel_emb)
.limit(10)
.execute()
.await
.unwrap();
let found_novel = recalled_novel
.iter()
.find(|r| r.record.id() == *id_novel)
.unwrap();
let imp_novel = match &found_novel.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic"),
};
assert!(
imp_novel >= 0.5,
"Batch novel record should retain default importance, got {imp_novel}",
);
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_svo_extraction_runs() {
let (db, _dir) = full_write_path_db().await;
let records: Vec<EpisodicRecord> = vec![
EpisodicRecord::builder()
.content("Alice deployed version 3.0 on 2026-06-01. Bob approved the PR.")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap(),
EpisodicRecord::builder()
.content("Carol merged the feature branch on 2026-06-02.")
.agent_id(agent())
.embedding(rand_vec(2))
.build()
.unwrap(),
];
let results = db.episodic().batch_remember(records).await;
assert!(results.iter().all(|r| r.is_ok()));
let count = db.storage_backend().count("svo_events", None).await;
match count {
Ok(n) => assert!(n > 0, "Batch SVO events should be written, got {n}"),
Err(_) => {
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_prospective_indexing_runs() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("batch_pi");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(2)
.prospective_indexing_templates(vec![
"What changed in {content}?".into(),
"Why does {content} matter?".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
db.set_embedder(Arc::new(PseudoEmbedder::new(DIM)));
let records = vec![
EpisodicRecord::builder()
.content("Alice deployed the API gateway to production")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap(),
EpisodicRecord::builder()
.content("Bob rotated the incident response keys after the outage")
.agent_id(agent())
.embedding(rand_vec(2))
.build()
.unwrap(),
];
let results = db.episodic().batch_remember(records).await;
assert!(results.iter().all(|r| r.is_ok()));
let ids: Vec<_> = results.into_iter().map(Result::unwrap).collect();
let total = db
.storage_backend()
.count("prospective_implications", None)
.await
.unwrap();
assert_eq!(
total, 4,
"Expected 4 implications across the batch, got {total}"
);
for id in ids {
let filter = format!("source_memory_id = '{id}'");
let count = db
.storage_backend()
.count("prospective_implications", Some(&filter))
.await
.unwrap();
assert_eq!(count, 2, "Each record should contribute 2 implications");
}
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_temporal_edges_created() {
let (db, _dir) = rpe_db().await;
let records: Vec<EpisodicRecord> = (0..3)
.map(|i| {
EpisodicRecord::builder()
.content(format!("Event number {i} in the timeline"))
.agent_id(agent())
.embedding(rand_vec(100 + i))
.build()
.unwrap()
})
.collect();
let results = db.episodic().batch_remember(records).await;
let ids: Vec<_> = results.into_iter().map(|r| r.unwrap()).collect();
let graph = db.cached_graph();
let edges_0 = graph.get_edges_between(ids[0], ids[1]).await.unwrap();
let temporal_0 = edges_0
.iter()
.find(|edge| {
edge.source == ids[0]
&& edge.target == ids[1]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
})
.unwrap();
assert!(
matches!(temporal_0.relation, EdgeRelation::TemporalNext),
"Should have TemporalNext edge from record 0 to record 1",
);
assert_eq!(
temporal_0.metadata.get("temporal_basis"),
Some(&MetadataValue::from("arrival_order")),
);
assert_eq!(
temporal_0.metadata.get("temporal_partition"),
Some(&MetadataValue::from("namespace")),
);
assert_eq!(
temporal_0.metadata.get("source_arrival_sequence"),
Some(&MetadataValue::from(1i64)),
);
assert_eq!(
temporal_0.metadata.get("target_arrival_sequence"),
Some(&MetadataValue::from(2i64)),
);
let edges_1 = graph.get_edges_between(ids[1], ids[2]).await.unwrap();
let temporal_1 = edges_1
.iter()
.find(|edge| {
edge.source == ids[1]
&& edge.target == ids[2]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
})
.unwrap();
assert!(
matches!(temporal_1.relation, EdgeRelation::TemporalNext),
"Should have TemporalNext edge from record 1 to record 2",
);
assert_eq!(
temporal_1.metadata.get("temporal_basis"),
Some(&MetadataValue::from("arrival_order")),
);
assert_eq!(
temporal_1.metadata.get("source_arrival_sequence"),
Some(&MetadataValue::from(2i64)),
);
assert_eq!(
temporal_1.metadata.get("target_arrival_sequence"),
Some(&MetadataValue::from(3i64)),
);
}
#[tokio::test(flavor = "multi_thread")]
async fn temporal_next_preserves_arrival_order_when_slow_path_overtakes() {
let (mut db, _dir) = full_write_path_db().await;
let (blocking_embedder, entered_rx) = BlockingEmbedder::new(DIM);
let blocking_embedder = Arc::new(blocking_embedder);
db.set_embedder(blocking_embedder.clone());
let db = Arc::new(db);
let namespace = Namespace::new("temporal-arrival-overtake").unwrap();
let later_event_time = Timestamp::from_millis(2_000);
let earlier_event_time = Timestamp::from_millis(1_000);
let shared_embedding = rand_vec(7_001);
let first_record = episode_at(
namespace,
"later event time arrives first but blocks in slow path",
shared_embedding.clone(),
later_event_time,
);
let second_record = episode_at(
namespace,
"earlier event time arrives second but should stay second",
shared_embedding,
earlier_event_time,
);
let db_first = Arc::clone(&db);
let first_handle =
tokio::spawn(async move { db_first.episodic().remember(first_record).await.unwrap() });
entered_rx.await.unwrap();
let db_second = Arc::clone(&db);
let second_handle =
tokio::spawn(async move { db_second.episodic().remember(second_record).await.unwrap() });
let second_id = timeout(Duration::from_secs(2), second_handle)
.await
.unwrap()
.unwrap();
blocking_embedder.release();
let first_id = first_handle.await.unwrap();
let temporal_edges = db
.cached_graph()
.get_edges_between(first_id, second_id)
.await
.unwrap();
let temporal_edge = temporal_edges
.iter()
.find(|edge| {
edge.source == first_id
&& edge.target == second_id
&& matches!(edge.relation, EdgeRelation::TemporalNext)
})
.unwrap();
assert_eq!(
temporal_edge.metadata.get("temporal_basis"),
Some(&MetadataValue::from("arrival_order")),
);
assert_eq!(
temporal_edge.metadata.get("temporal_partition"),
Some(&MetadataValue::from("namespace")),
);
assert_eq!(
temporal_edge.metadata.get("source_arrival_sequence"),
Some(&MetadataValue::from(1i64)),
);
assert_eq!(
temporal_edge.metadata.get("target_arrival_sequence"),
Some(&MetadataValue::from(2i64)),
);
let reverse_edges = db
.cached_graph()
.get_edges_between(second_id, first_id)
.await
.unwrap();
assert!(reverse_edges.iter().all(|edge| {
!(edge.source == second_id
&& edge.target == first_id
&& matches!(edge.relation, EdgeRelation::TemporalNext))
}));
}
#[tokio::test(flavor = "multi_thread")]
async fn temporal_next_replay_same_write_set_preserves_arrival_chain() {
let (db_a, _dir_a) = rpe_db().await;
let (db_b, _dir_b) = rpe_db().await;
let namespace = Namespace::new("temporal-replay").unwrap();
let records = vec![
episode_at(
namespace,
"step-1",
rand_vec(8_001),
Timestamp::from_millis(3_000),
),
episode_at(
namespace,
"step-2",
rand_vec(8_002),
Timestamp::from_millis(1_000),
),
episode_at(
namespace,
"step-3",
rand_vec(8_003),
Timestamp::from_millis(2_000),
),
];
let ids_a: Vec<_> = db_a
.episodic()
.batch_remember(records.clone())
.await
.into_iter()
.map(|result| result.unwrap())
.collect();
let ids_b: Vec<_> = db_b
.episodic()
.batch_remember(records)
.await
.into_iter()
.map(|result| result.unwrap())
.collect();
for (db, ids) in [(&db_a, &ids_a), (&db_b, &ids_b)] {
let first_edge = db
.cached_graph()
.get_edges_between(ids[0], ids[1])
.await
.unwrap();
assert!(first_edge.iter().any(|edge| {
edge.source == ids[0]
&& edge.target == ids[1]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
&& edge.metadata.get("temporal_basis")
== Some(&MetadataValue::from("arrival_order"))
&& edge.metadata.get("source_arrival_sequence") == Some(&MetadataValue::from(1i64))
&& edge.metadata.get("target_arrival_sequence") == Some(&MetadataValue::from(2i64))
}));
let second_edge = db
.cached_graph()
.get_edges_between(ids[1], ids[2])
.await
.unwrap();
assert!(second_edge.iter().any(|edge| {
edge.source == ids[1]
&& edge.target == ids[2]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
&& edge.metadata.get("temporal_basis")
== Some(&MetadataValue::from("arrival_order"))
&& edge.metadata.get("source_arrival_sequence") == Some(&MetadataValue::from(2i64))
&& edge.metadata.get("target_arrival_sequence") == Some(&MetadataValue::from(3i64))
}));
}
}
#[tokio::test(flavor = "multi_thread")]
async fn temporal_next_ignores_mixed_timestamp_quality_and_uses_arrival_sequence() {
let (db, _dir) = rpe_db().await;
let namespace = Namespace::new("temporal-mixed-quality").unwrap();
let records = vec![
episode_at(
namespace,
"timestamp present first",
rand_vec(8_101),
Timestamp::from_millis(3_000),
),
EpisodicRecord::builder()
.content("timestamp inferred second")
.agent_id(agent())
.namespace(namespace)
.embedding(rand_vec(8_102))
.build()
.unwrap(),
episode_at(
namespace,
"duplicate timestamp third",
rand_vec(8_103),
Timestamp::from_millis(3_000),
),
];
let ids: Vec<_> = db
.episodic()
.batch_remember(records)
.await
.into_iter()
.map(|result| result.unwrap())
.collect();
let first_edge = db
.cached_graph()
.get_edges_between(ids[0], ids[1])
.await
.unwrap();
assert!(first_edge.iter().any(|edge| {
edge.source == ids[0]
&& edge.target == ids[1]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
&& edge.metadata.get("source_arrival_sequence") == Some(&MetadataValue::from(1i64))
&& edge.metadata.get("target_arrival_sequence") == Some(&MetadataValue::from(2i64))
}));
let second_edge = db
.cached_graph()
.get_edges_between(ids[1], ids[2])
.await
.unwrap();
assert!(second_edge.iter().any(|edge| {
edge.source == ids[1]
&& edge.target == ids[2]
&& matches!(edge.relation, EdgeRelation::TemporalNext)
&& edge.metadata.get("source_arrival_sequence") == Some(&MetadataValue::from(2i64))
&& edge.metadata.get("target_arrival_sequence") == Some(&MetadataValue::from(3i64))
}));
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_mixed_dimensions_rejects_invalid() {
let (db, _dir) = rpe_db().await;
let good = EpisodicRecord::builder()
.content("Valid record with correct embedding dimensions")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let bad_emb: Vec<f32> = (0..16).map(|i| i as f32 * 0.1).collect();
let bad = EpisodicRecord::builder()
.content("Invalid record with wrong embedding dimensions")
.agent_id(agent())
.embedding(bad_emb)
.build()
.unwrap();
let results = db.episodic().batch_remember(vec![good, bad]).await;
let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
let failures: Vec<_> = results.iter().filter(|r| r.is_err()).collect();
assert_eq!(successes.len(), 1, "Good record should succeed");
assert_eq!(failures.len(), 1, "Bad dimension record should fail");
}
#[tokio::test(flavor = "multi_thread")]
async fn empty_prospective_templates_produces_no_implications() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("empty_templates");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0) .prospective_indexing_enabled(true)
.prospective_indexing_num_questions(5)
.prospective_indexing_templates(vec![]) .build()
.unwrap();
let db = HirnDB::open_with_config(config, backend).await.unwrap();
let record = EpisodicRecord::builder()
.content("This should not generate any prospective implications")
.agent_id(agent())
.embedding(rand_vec(99))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
let count = db
.storage_backend()
.count("prospective_implications", None)
.await;
match count {
Ok(n) => assert_eq!(n, 0, "Empty templates should produce 0 implications"),
Err(_) => {} }
_ = id;
}
#[tokio::test(flavor = "multi_thread")]
async fn custom_prospective_templates_applied() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("custom_templates");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0) .prospective_indexing_enabled(true)
.prospective_indexing_num_questions(2)
.prospective_indexing_templates(vec![
"Custom question about {content}".into(),
"Another angle on {content}".into(),
])
.build()
.unwrap();
let db = HirnDB::open_with_config(config, backend).await.unwrap();
let record = EpisodicRecord::builder()
.content("Alice deployed version 2.3 on staging for the release")
.agent_id(agent())
.embedding(rand_vec(200))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
_ = id;
}
#[tokio::test(flavor = "multi_thread")]
async fn prospective_indexing_with_embedder_stores_implications() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("pi_with_embedder");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0) .prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_templates(vec![
"What happened regarding {content}?".into(),
"Who was involved in {content}?".into(),
"Why is {content} important?".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let embedder = Arc::new(PseudoEmbedder::new(DIM));
db.set_embedder(embedder);
let record = EpisodicRecord::builder()
.content("Alice deployed the new microservice to production in eu-west-1")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
_ = id;
let count = db
.storage_backend()
.count("prospective_implications", None)
.await;
match count {
Ok(n) => assert!(
n >= 3,
"Expected at least 3 implications (one per template), got {n}"
),
Err(_) => panic!("prospective_implications dataset should exist after PI write"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn prospective_implications_have_correct_source_memory_id() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("pi_fk");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(2)
.prospective_indexing_templates(vec![
"Why is {content} relevant?".into(),
"How does {content} affect the system?".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
db.set_embedder(Arc::new(PseudoEmbedder::new(DIM)));
let record = EpisodicRecord::builder()
.content("Bob fixed the database replication lag in production")
.agent_id(agent())
.embedding(rand_vec(99))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
let filter = format!("source_memory_id = '{id}'");
let count = db
.storage_backend()
.count("prospective_implications", Some(&filter))
.await
.unwrap_or(0);
assert!(
count >= 2,
"All implications should have source_memory_id = {id}, found {count}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn prospective_implications_searchable_by_vector() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("pi_search");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config.clone())
.await
.unwrap()
.store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(2)
.prospective_indexing_templates(vec![
"What is {content}?".into(),
"Describe {content}.".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend.clone())
.await
.unwrap();
db.set_embedder(Arc::new(PseudoEmbedder::new(DIM)));
let record = EpisodicRecord::builder()
.content("The load balancer was reconfigured to round-robin")
.agent_id(agent())
.embedding(rand_vec(77))
.build()
.unwrap();
db.episodic().remember(record).await.unwrap();
let query_vec = rand_vec(77); let results = backend
.vector_search(
"prospective_implications",
hirn_storage::store::VectorSearchOptions {
column: "embedding".to_owned(),
query: query_vec,
limit: 5,
..Default::default()
},
)
.await;
match results {
Ok(batches) => {
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert!(
total_rows >= 1,
"Prospective implications should be searchable by vector, got {total_rows} rows"
);
}
Err(e) => panic!("Vector search on prospective_implications failed: {e}"),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_skips_prospective_indexing() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("fast_path_no_pi");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.3)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_templates(vec!["Question about {content}?".into()])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
db.set_embedder(Arc::new(PseudoEmbedder::new(DIM)));
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog again")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r2).await.unwrap();
let count = db
.storage_backend()
.count("prospective_implications", None)
.await
.unwrap_or(0);
assert!(
count <= 3,
"Fast-path record should not generate implications, got {count} total"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn consolidation_write_failure_no_orphaned_edges() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("consol_fail");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.build()
.unwrap();
let db = HirnDB::open_with_config(config, backend).await.unwrap();
let r = EpisodicRecord::builder()
.content("A single lonely memory")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
let result = db.admin().consolidate().execute().await.unwrap();
assert!(
result.provenance_edges_created <= result.concepts_extracted,
"Provenance edges ({}) should not exceed concepts ({})",
result.provenance_edges_created,
result.concepts_extracted,
);
let graph = db.cached_graph();
let edges = graph.all_edges().await.unwrap_or_default();
let derived_from_count = edges
.iter()
.filter(|e| matches!(e.relation, EdgeRelation::DerivedFrom))
.count();
assert!(
derived_from_count <= result.concepts_extracted,
"DerivedFrom edges ({derived_from_count}) should not exceed concepts ({})",
result.concepts_extracted,
);
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_no_embedder_calls_for_prospective() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("fast_path_track_embed");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.3)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_templates(vec!["What about {content}?".into()])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let tracker = Arc::new(TrackingEmbedder::new(DIM));
db.set_embedder(tracker.clone());
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let calls_after_first = tracker.call_count();
let r2 = EpisodicRecord::builder()
.content("The quick brown fox jumps over the lazy dog, repeated")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r2).await.unwrap();
let calls_after_second = tracker.call_count();
assert_eq!(
calls_after_first, calls_after_second,
"Fast-path record should not call embedder for prospective indexing (first: {calls_after_first}, second: {calls_after_second})"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_stores_in_under_100ms() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("Baseline content for timing test")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("Baseline content for timing test, variant")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
let start = std::time::Instant::now();
db.episodic().remember(r2).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 500,
"Fast-path store should complete quickly, took {}ms",
elapsed.as_millis()
);
}
#[tokio::test(flavor = "multi_thread")]
async fn near_duplicate_gets_low_importance() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("The capital of France is Paris")
.agent_id(agent())
.importance(0.5)
.embedding(emb.clone())
.build()
.unwrap();
let id1 = db.episodic().remember(r1).await.unwrap();
let r2 = EpisodicRecord::builder()
.content("The capital of France is Paris, confirmed")
.agent_id(agent())
.importance(0.5)
.embedding(emb.clone())
.build()
.unwrap();
let id2 = db.episodic().remember(r2).await.unwrap();
let results = db
.recall_view()
.query(emb)
.limit(10)
.execute()
.await
.unwrap();
let original = results.iter().find(|r| r.record.id() == id1).unwrap();
let duplicate = results.iter().find(|r| r.record.id() == id2).unwrap();
let orig_imp = match &original.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic"),
};
let dup_imp = match &duplicate.record {
MemoryRecord::Episodic(e) => e.importance,
_ => panic!("Expected episodic"),
};
assert!(
dup_imp < orig_imp,
"Duplicate importance ({dup_imp}) should be lower than original ({orig_imp})"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_completes_in_target_latency() {
let (db, _dir) = rpe_db().await;
let emb = rand_vec(42);
let r1 = EpisodicRecord::builder()
.content("Seed content for fast-path timing measurement")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r1).await.unwrap();
let _ = db.recall_view().query(emb.clone()).limit(1).execute().await;
let mut timings = Vec::new();
for i in 0..5 {
let r = EpisodicRecord::builder()
.content(format!("Fast path timing variant {i}"))
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
let start = std::time::Instant::now();
db.episodic().remember(r).await.unwrap();
timings.push(start.elapsed());
}
timings.sort();
let median_ms = timings[timings.len() / 2].as_millis();
eprintln!("Fast-path median latency: {median_ms}ms (timings: {timings:?})");
assert!(
median_ms < 500,
"Fast-path median latency {median_ms}ms exceeds 500ms threshold"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn slow_path_completes_in_target_latency() {
let (db, _dir) = full_write_path_db().await;
let r_warmup = EpisodicRecord::builder()
.content("Warmup content for slow path timing")
.agent_id(agent())
.embedding(rand_vec(1))
.build()
.unwrap();
db.episodic().remember(r_warmup).await.unwrap();
let mut timings = Vec::new();
for i in 0..5 {
let r = EpisodicRecord::builder()
.content(format!(
"Completely unique slow-path topic about area number {i} with novel vocabulary"
))
.agent_id(agent())
.embedding(rand_vec((i + 100) as u128))
.build()
.unwrap();
let start = std::time::Instant::now();
db.episodic().remember(r).await.unwrap();
timings.push(start.elapsed());
}
timings.sort();
let median_ms = timings[timings.len() / 2].as_millis();
eprintln!("Slow-path median latency: {median_ms}ms (timings: {timings:?})");
assert!(
median_ms < 1000,
"Slow-path median latency {median_ms}ms exceeds 1000ms threshold"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn fast_path_faster_than_slow_path() {
let emb = rand_vec(42);
let (db_slow, _dir_slow) = full_write_path_db().await;
let seed = EpisodicRecord::builder()
.content("Seed content for slow-path DB")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db_slow.episodic().remember(seed).await.unwrap();
let slow_start = std::time::Instant::now();
for i in 0..5 {
let r = EpisodicRecord::builder()
.content(format!(
"Novel content item {i} about completely unrelated domain"
))
.agent_id(agent())
.embedding(rand_vec((i + 500) as u128))
.build()
.unwrap();
db_slow.episodic().remember(r).await.unwrap();
}
let slow_elapsed = slow_start.elapsed();
let (db_fast, _dir_fast) = full_write_path_db().await;
let seed2 = EpisodicRecord::builder()
.content("Seed content for fast-path DB")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db_fast.episodic().remember(seed2).await.unwrap();
let fast_start = std::time::Instant::now();
for i in 0..5 {
let r = EpisodicRecord::builder()
.content(format!("Fast path duplicate content variant {i}"))
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db_fast.episodic().remember(r).await.unwrap();
}
let fast_elapsed = fast_start.elapsed();
eprintln!(
"Fast path: {fast_elapsed:?}, Slow path: {slow_elapsed:?}, Ratio: {:.2}x",
slow_elapsed.as_secs_f64() / fast_elapsed.as_secs_f64().max(0.001)
);
assert!(
fast_elapsed.as_secs_f64() < 10.0,
"Fast path took too long: {fast_elapsed:?}"
);
assert!(
slow_elapsed.as_secs_f64() < 10.0,
"Slow path took too long: {slow_elapsed:?}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn slow_path_runs_svo_extraction_for_novel_content() {
let (db, _dir) = full_write_path_db().await;
let r = EpisodicRecord::builder()
.content("Alice deployed version 2.3 on 2026-03-15. Bob reviewed the pull request on 2026-03-16.")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let _id = db.episodic().remember(r).await.unwrap();
let count = db.storage_backend().count("svo_events", None).await;
match count {
Ok(n) => assert!(
n > 0,
"Slow path should run SVO extraction for temporal content, got {n} events"
),
Err(_) => {
}
}
let graph = db.cached_graph();
let nodes = graph.node_count().await.unwrap_or(0);
assert!(nodes >= 1, "Graph should have at least 1 node");
}
struct SlowEmbedder {
inner: PseudoEmbedder,
delay: std::time::Duration,
}
impl SlowEmbedder {
fn new(dims: usize, delay_secs: u64) -> Self {
Self {
inner: PseudoEmbedder::new(dims),
delay: std::time::Duration::from_secs(delay_secs),
}
}
}
#[async_trait::async_trait]
impl hirn_core::embed::Embedder for SlowEmbedder {
async fn embed(
&self,
texts: &[&str],
) -> hirn_core::HirnResult<Vec<hirn_core::embed::Embedding>> {
tokio::time::sleep(self.delay).await;
self.inner.embed(texts).await
}
fn dimensions(&self) -> usize {
self.inner.dimensions()
}
fn model_id(&self) -> &str {
"slow-pseudo"
}
fn max_input_tokens(&self) -> usize {
usize::MAX
}
}
#[tokio::test(flavor = "multi_thread")]
async fn prospective_indexing_timeout_still_stores_memory() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("pi_timeout");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0) .prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_timeout_secs(1) .prospective_indexing_templates(vec![
"What about {content}?".into(),
"Who is involved in {content}?".into(),
"Why does {content} matter?".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let slow = Arc::new(SlowEmbedder::new(DIM, 10));
db.set_embedder(slow);
let record = EpisodicRecord::builder()
.content("Alice deployed the system to production after thorough testing")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
let results = db
.recall_view()
.query(rand_vec(42))
.limit(5)
.execute()
.await
.unwrap();
assert!(
results.iter().any(|r| r.record.id() == id),
"Memory should be stored and recallable despite PI timeout"
);
let count = db
.storage_backend()
.count("prospective_implications", None)
.await
.unwrap_or(0);
assert_eq!(
count, 0,
"PI timeout should result in 0 implications, got {count}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn mock_embedder_all_implications_embedded_and_stored() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("pi_mock_full");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let num_questions = 5;
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.0) .prospective_indexing_enabled(true)
.prospective_indexing_num_questions(num_questions)
.prospective_indexing_templates(vec![
"What happened regarding {content}?".into(),
"Who was involved in {content}?".into(),
"Why is {content} important?".into(),
"When did {content} occur?".into(),
"Where did {content} take place?".into(),
])
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let tracker = Arc::new(TrackingEmbedder::new(DIM));
db.set_embedder(tracker.clone());
let record = EpisodicRecord::builder()
.content("Carol merged the critical security patch into production on Monday morning")
.agent_id(agent())
.embedding(rand_vec(42))
.build()
.unwrap();
let id = db.episodic().remember(record).await.unwrap();
assert!(
tracker.call_count() >= 1,
"Embedder should be called for prospective indexing, got {} calls",
tracker.call_count()
);
let count = db
.storage_backend()
.count("prospective_implications", None)
.await
.unwrap_or(0);
assert_eq!(
count, num_questions as u64,
"Expected {num_questions} implications (one per template), got {count}"
);
let filter = format!("source_memory_id = '{id}'");
let fk_count = db
.storage_backend()
.count("prospective_implications", Some(&filter))
.await
.unwrap_or(0);
assert_eq!(
fk_count, num_questions as u64,
"All {num_questions} implications should reference source memory {id}, got {fk_count}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn rpe_fast_path_saves_llm_tokens() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("token_savings");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.rpe_enabled(true)
.rpe_fast_path_threshold(0.3)
.rpe_similarity_search_limit(5)
.prospective_indexing_enabled(true)
.prospective_indexing_num_questions(3)
.prospective_indexing_templates(vec![
"What about {content}?".into(),
"Why does {content} matter?".into(),
"How does {content} relate?".into(),
])
.svo_extraction_enabled(true)
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let tracker = Arc::new(TrackingEmbedder::new(DIM));
db.set_embedder(tracker.clone());
let emb = rand_vec(42);
let seed = EpisodicRecord::builder()
.content("Initial knowledge base entry about distributed systems")
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(seed).await.unwrap();
let calls_after_slow = tracker.call_count();
for i in 0..10 {
let r = EpisodicRecord::builder()
.content(format!(
"Initial knowledge base entry about distributed systems, variant {i}"
))
.agent_id(agent())
.embedding(emb.clone())
.build()
.unwrap();
db.episodic().remember(r).await.unwrap();
}
let calls_after_fast = tracker.call_count();
assert_eq!(
calls_after_slow, calls_after_fast,
"Fast-path records should generate zero additional embed calls: slow={calls_after_slow}, after_fast={calls_after_fast}"
);
let fast_path_ratio = 10.0 / 11.0;
assert!(
fast_path_ratio >= 0.7,
"Fast-path ratio {fast_path_ratio:.0}% should be ≥ 70%"
);
}
struct FailingThenSucceedingEmbedder {
inner: PseudoEmbedder,
calls: std::sync::atomic::AtomicUsize,
fail_until: usize,
}
impl FailingThenSucceedingEmbedder {
fn new(dims: usize, fail_until: usize) -> Self {
Self {
inner: PseudoEmbedder::new(dims),
calls: std::sync::atomic::AtomicUsize::new(0),
fail_until,
}
}
}
#[async_trait::async_trait]
impl hirn_core::embed::Embedder for FailingThenSucceedingEmbedder {
async fn embed(
&self,
texts: &[&str],
) -> hirn_core::HirnResult<Vec<hirn_core::embed::Embedding>> {
let call_num = self
.calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if call_num < self.fail_until {
Err(hirn_core::HirnError::ProviderError(
"simulated embed failure".into(),
))
} else {
self.inner.embed(texts).await
}
}
fn dimensions(&self) -> usize {
self.inner.dimensions()
}
fn model_id(&self) -> &str {
"failing-then-succeeding"
}
fn max_input_tokens(&self) -> usize {
usize::MAX
}
}
struct FailsSecondBatchChunkEmbedder {
inner: PseudoEmbedder,
calls: std::sync::atomic::AtomicUsize,
}
impl FailsSecondBatchChunkEmbedder {
fn new(dims: usize) -> Self {
Self {
inner: PseudoEmbedder::new(dims),
calls: std::sync::atomic::AtomicUsize::new(0),
}
}
}
#[async_trait::async_trait]
impl hirn_core::embed::Embedder for FailsSecondBatchChunkEmbedder {
async fn embed(
&self,
texts: &[&str],
) -> hirn_core::HirnResult<Vec<hirn_core::embed::Embedding>> {
let call_num = self
.calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if call_num == 1 {
Err(hirn_core::HirnError::ProviderError(
"simulated chunk failure".into(),
))
} else {
self.inner.embed(texts).await
}
}
fn dimensions(&self) -> usize {
self.inner.dimensions()
}
fn model_id(&self) -> &str {
"fails-second-batch-chunk"
}
fn max_input_tokens(&self) -> usize {
usize::MAX
}
}
#[tokio::test(flavor = "multi_thread")]
async fn batch_remember_preserves_partial_embed_successes() {
let (mut db, _dir) = rpe_db().await;
let embedder = hirn_provider::BatchingEmbedder::new(
FailsSecondBatchChunkEmbedder::new(DIM),
NonZeroUsize::new(2).unwrap(),
);
db.set_embedder(Arc::new(embedder));
let records = (0..5)
.map(|i| {
EpisodicRecord::builder()
.content(format!("Partial embed record {i}"))
.multi_content(MemoryContent::Text(format!("Partial embed record {i}")))
.agent_id(agent())
.build()
.unwrap()
})
.collect();
let results = db.episodic().batch_remember(records).await;
assert!(results.iter().all(Result::is_ok));
assert_eq!(
db.pending_embed_count(),
2,
"only the failed chunk should be requeued"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn embedder_recovery_processes_pending_embeds() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("embed_retry");
let lance_path = dir.path().join("lance");
let storage_config = HirnDbConfig::local(lance_path.to_str().unwrap());
let backend: Arc<dyn PhysicalStore> = HirnDb::open(storage_config).await.unwrap().store_arc();
let config = HirnConfig::builder()
.db_path(&db_path)
.working_memory_token_limit(1000)
.embedding_dimensions(DIM as u32)
.build()
.unwrap();
let mut db = HirnDB::open_with_config(config, backend).await.unwrap();
let failing = Arc::new(FailingThenSucceedingEmbedder::new(DIM, 100));
db.set_embedder(failing);
let r = EpisodicRecord::builder()
.content("Record that needs embedding but embedder is down")
.multi_content(MemoryContent::Text(
"Record that needs embedding but embedder is down".into(),
))
.agent_id(agent())
.build()
.unwrap();
let id = db.episodic().remember(r).await.unwrap();
assert!(
db.pending_embed_count() > 0,
"Record should be in pending embed queue"
);
let working = Arc::new(PseudoEmbedder::new(DIM));
db.set_embedder(working);
let (succeeded, failed) = db.retry_pending_embeds().await;
assert_eq!(
succeeded, 1,
"One pending embed should succeed: succeeded={succeeded}, failed={failed}"
);
assert_eq!(failed, 0, "No failures after embedder recovery");
assert_eq!(
db.pending_embed_count(),
0,
"Queue should be empty after successful retry"
);
let results = db
.recall_view()
.query(
hirn_provider::PseudoEmbedder::new(DIM)
.embed(&["Record that needs embedding but embedder is down"])
.await
.unwrap()
.into_iter()
.next()
.unwrap()
.vector,
)
.limit(5)
.execute()
.await
.unwrap();
let found = results.iter().any(|r| r.record.id() == id);
assert!(
found,
"Record should be findable by vector search after embed retry"
);
}