#![cfg(test)]
use std::sync::Arc;
use std::time::Duration;
use rusqlite::params;
use solo_core::{Embedding, EmbeddingDtype, Result, VectorIndex};
use crate::recovery::replay_pending_index;
use crate::test_support::{
StubVectorIndex, fixture_embedding, fixture_episode, open_test_db, open_test_db_at,
};
use crate::writer::{WriterActor, WriterSpawn};
fn rt_multi(threads: usize) -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.enable_all()
.build()
.unwrap()
}
#[test]
fn ten_thousand_pending_rows_replay_within_budget() {
let (mut conn, _tmp) = open_test_db();
let n = 10_000usize;
let dim = 4usize;
let now_ms = chrono::Utc::now().timestamp_millis();
let tx = conn.transaction().unwrap();
for i in 0..n {
let ep = fixture_episode(&format!("p{i}"));
tx.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, source_id, content,
encoding_context_json, provenance_json, confidence,
strength, salience, tier, created_at_ms, updated_at_ms
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
ep.memory_id.to_string(),
ep.ts_ms,
ep.source_type,
ep.source_id,
ep.content,
"{}",
Option::<String>::None,
ep.confidence.0,
0.5f32,
0.5f32,
"hot",
now_ms,
now_ms,
],
)
.unwrap();
let zeros = vec![0u8; dim * 4];
tx.execute(
"INSERT INTO pending_index (memory_id, embedding, embedding_dim, enqueued_at)
VALUES (?, ?, ?, ?)",
params![ep.memory_id.to_string(), &zeros[..], dim as i64, 0i64],
)
.unwrap();
}
tx.commit().unwrap();
let stub = StubVectorIndex::new(dim);
let started = std::time::Instant::now();
let report = replay_pending_index(&mut conn, &stub).unwrap();
let elapsed = started.elapsed();
assert_eq!(report.rows_seen, n);
assert_eq!(report.rows_replayed, n);
assert_eq!(report.rows_failed, 0);
assert_eq!(stub.add_count(), n);
let remaining: i64 = conn
.query_row("SELECT COUNT(*) FROM pending_index", [], |r| r.get(0))
.unwrap();
assert_eq!(remaining, 0);
assert!(
elapsed < Duration::from_secs(30),
"10k pending replay took {elapsed:?} (budget 30s per ADR-0003)"
);
}
#[test]
fn snapshot_failure_does_not_crash_writer() {
let (conn, _tmp) = open_test_db();
let stub = Arc::new(StubVectorIndex::new(4));
stub.set_save_fails(true);
let WriterSpawn { handle, join } = WriterActor::spawn_with_snapshot_dir(
conn,
stub.clone(),
std::path::PathBuf::from("/dev/null"), );
let runtime = rt_multi(2);
runtime.block_on(async {
let err = handle.save_snapshot().await.unwrap_err();
assert!(
err.to_string().contains("stub configured to fail"),
"got: {err}"
);
let mid = handle
.remember(fixture_episode("post-fail"), fixture_embedding(4))
.await
.unwrap();
let _ = mid;
});
drop(handle);
join.join().expect("writer thread joined cleanly");
assert_eq!(stub.save_count(), 1);
assert_eq!(stub.add_count(), 1);
}
#[test]
fn write_channel_full_blocks_caller_then_drains() {
let (conn, _tmp) = open_test_db();
let stub = Arc::new(StubVectorIndex::new(4));
stub.set_add_sleep(Some(Duration::from_millis(50)));
let WriterSpawn { handle, join } =
WriterActor::spawn_with_capacity(conn, stub.clone(), 2);
let runtime = rt_multi(2);
let started = std::time::Instant::now();
runtime.block_on(async {
for i in 0..5 {
handle
.remember(
fixture_episode(&format!("burst-{i}")),
fixture_embedding(4),
)
.await
.unwrap();
}
});
let elapsed = started.elapsed();
assert!(
elapsed >= Duration::from_millis(200),
"5 slow writes finished in {elapsed:?}; expected ≥ 200ms"
);
assert_eq!(stub.add_count(), 5);
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn very_slow_hnsw_add_does_not_deadlock() {
let (conn, _tmp) = open_test_db();
let stub = Arc::new(StubVectorIndex::new(4));
stub.set_add_sleep(Some(Duration::from_millis(200)));
let WriterSpawn { handle, join } =
WriterActor::spawn_with_capacity(conn, stub.clone(), 1);
let runtime = rt_multi(2);
let started = std::time::Instant::now();
runtime.block_on(async {
for i in 0..3 {
handle
.remember(
fixture_episode(&format!("slow-{i}")),
fixture_embedding(4),
)
.await
.unwrap();
}
});
let elapsed = started.elapsed();
assert!(
elapsed >= Duration::from_millis(500),
"expected ≥ 500ms, got {elapsed:?}"
);
assert!(
elapsed < Duration::from_secs(2),
"expected < 2s, got {elapsed:?} (deadlock?)"
);
assert_eq!(stub.add_count(), 3);
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn high_concurrency_reads_and_writes_complete_without_sqlite_busy() {
use crate::reader::ReaderPool;
use crate::test_support::open_test_db_at;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("stress.db");
let _ = open_test_db_at(&path);
let stub: Arc<dyn VectorIndex + Send + Sync> = Arc::new(StubVectorIndex::new(4));
let writer_conn = open_test_db_at(&path);
let WriterSpawn { handle, join } =
WriterActor::spawn_with_capacity(writer_conn, stub.clone(), 1024);
let runtime = rt_multi(4);
runtime.block_on(async {
let pool = ReaderPool::new(&path, None, stub.clone()).unwrap();
let mut tasks = Vec::new();
for i in 0..200 {
let h = handle.clone();
tasks.push(tokio::spawn(async move {
h.remember(
fixture_episode(&format!("stress-{i}")),
fixture_embedding(4),
)
.await
}));
}
let mut read_tasks = Vec::new();
for _ in 0..50 {
let p = &pool;
read_tasks.push(p.interact(|conn| {
conn.query_row("SELECT COUNT(*) FROM episodes", [], |r| {
r.get::<_, i64>(0)
})
}));
}
for t in tasks {
t.await.unwrap().expect("write must succeed");
}
for r in read_tasks {
let _: i64 = r.await.expect("read must not surface SQLITE_BUSY");
}
});
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn duplicate_memory_id_is_rejected_with_clean_state() {
let (conn, _tmp) = open_test_db();
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn(conn, stub.clone());
let runtime = rt_multi(2);
let mid = solo_core::MemoryId::new();
runtime.block_on(async {
let mut e1 = fixture_episode("first");
e1.memory_id = mid;
handle
.remember(e1, fixture_embedding(4))
.await
.expect("first remember succeeds");
let mut e2 = fixture_episode("dup");
e2.memory_id = mid;
let err = handle
.remember(e2, fixture_embedding(4))
.await
.expect_err("duplicate memory_id must fail");
let msg = err.to_string();
assert!(
msg.to_lowercase().contains("unique") || msg.to_lowercase().contains("constraint"),
"expected unique-constraint message, got: {msg}"
);
});
drop(handle);
join.join().expect("writer thread joined cleanly");
assert_eq!(stub.add_count(), 1);
}
#[test]
fn forget_after_remember_is_consistent() {
let (conn, _tmp) = open_test_db();
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn(conn, stub);
let runtime = rt_multi(2);
runtime.block_on(async {
let ep = fixture_episode("to forget");
let mid = ep.memory_id;
handle
.remember(ep, fixture_embedding(4))
.await
.expect("remember succeeds");
handle
.forget(mid, "test".into())
.await
.expect("forget succeeds");
handle
.forget(mid, "test".into())
.await
.expect("re-forget is Ok (idempotent)");
});
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn remember_persists_to_embeddings_when_embedder_id_is_set() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::test_support::open_test_db_at;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub.clone(),
tmp.path().to_path_buf(),
embedder_id,
);
let runtime = rt_multi(2);
runtime.block_on(async {
let ep = fixture_episode("with-embeddings-row");
let mid = ep.memory_id;
handle
.remember(ep, fixture_embedding(4))
.await
.expect("remember");
let read_conn = open_test_db_at(&path);
let n: i64 = read_conn
.query_row(
"SELECT COUNT(*) FROM embeddings WHERE memory_id = ?",
rusqlite::params![mid.to_string()],
|r| r.get(0),
)
.unwrap();
assert_eq!(n, 1, "embeddings row missing for {mid}");
let (eid_stored, dim_stored, dtype_stored): (i64, i64, String) = read_conn
.query_row(
"SELECT embedder_id, dim, dtype FROM embeddings WHERE memory_id = ?",
rusqlite::params![mid.to_string()],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
)
.unwrap();
assert_eq!(eid_stored, embedder_id);
assert_eq!(dim_stored, 4);
assert_eq!(dtype_stored, "f32");
});
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn remember_skips_embeddings_when_embedder_id_is_none() {
let (conn, _tmp) = open_test_db();
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn(conn, stub.clone());
let runtime = rt_multi(2);
runtime.block_on(async {
let ep = fixture_episode("without-eid");
let mid = ep.memory_id;
handle
.remember(ep, fixture_embedding(4))
.await
.expect("remember");
let _ = mid;
});
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn forget_tombstones_the_hnsw_at_runtime() {
let (conn, _tmp) = open_test_db();
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn(conn, stub.clone());
let runtime = rt_multi(2);
runtime.block_on(async {
let ep = fixture_episode("to forget at runtime");
let mid = ep.memory_id;
handle
.remember(ep, fixture_embedding(4))
.await
.expect("remember");
assert_eq!(stub.add_count(), 1);
assert_eq!(stub.remove_count(), 0);
handle.forget(mid, "test".into()).await.expect("forget");
assert_eq!(stub.remove_count(), 1);
});
drop(handle);
join.join().expect("writer thread joined cleanly");
}
#[test]
fn multiple_clones_keep_actor_alive_until_last_drop() {
let (conn, _tmp) = open_test_db();
let stub = std::sync::Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn(conn, stub.clone());
let h2 = handle.clone();
let h3 = h2.clone();
let runtime = rt_multi(2);
runtime.block_on(async {
h3.remember(fixture_episode("via clone"), fixture_embedding(4))
.await
.expect("write through clone");
});
drop(h2);
drop(h3);
runtime.block_on(async {
handle
.remember(fixture_episode("after partial drop"), fixture_embedding(4))
.await
.expect("write after dropping clones");
});
drop(handle);
join.join().expect("writer thread joined cleanly");
assert_eq!(stub.add_count(), 2);
}
#[test]
fn writer_rejects_non_f32_embedding_with_clear_error() {
let (conn, _tmp) = open_test_db();
let stub = Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join: _ } = WriterActor::spawn(conn, stub);
let bad = Embedding {
dtype: EmbeddingDtype::F16,
dim: 4,
data: vec![0u8; 4 * 2],
};
let runtime = rt_multi(1);
let res: Result<solo_core::MemoryId> = runtime.block_on(async {
handle.remember(fixture_episode("non-f32"), bad).await
});
let err = res.unwrap_err();
assert!(
err.to_string().contains("HNSW expects F32"),
"got: {err}"
);
}
fn seed_episodes_under_embedder(
path: &std::path::Path,
snapshot_dir: &std::path::Path,
old_embedder_id: i64,
contents: &[&str],
) -> Vec<solo_core::MemoryId> {
let conn = open_test_db_at(path);
let stub = Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub,
snapshot_dir.to_path_buf(),
old_embedder_id,
);
let runtime = rt_multi(2);
let mids = runtime.block_on(async {
let mut mids = Vec::with_capacity(contents.len());
for c in contents {
let ep = fixture_episode(c);
mids.push(ep.memory_id);
handle
.remember(ep, fixture_embedding(4))
.await
.expect("seed remember");
}
mids
});
drop(handle);
join.join().expect("seed writer joined cleanly");
mids
}
#[test]
fn reembed_inserts_new_rows_without_gc() {
use crate::embedder::StubEmbedder;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ReembedScope;
use solo_core::Embedder;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let snap = tmp.path().to_path_buf();
let (old_id, new_id) = {
let conn = open_test_db_at(&path);
let old = get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-old".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap();
let new = get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-new".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap();
(old, new)
};
let _mids = seed_episodes_under_embedder(&path, &snap, old_id, &["alpha", "beta", "gamma"]);
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(4));
let embedder: Arc<dyn Embedder> = Arc::new(StubEmbedder::new("stub-new", "v1", 4));
let WriterSpawn { handle, join } = WriterActor::spawn_full_with_embedder(
conn,
stub,
snap.clone(),
new_id,
embedder,
);
let report = handle
.reembed(ReembedScope::default())
.await
.expect("reembed dispatch");
assert_eq!(report.rows_seen, 3, "all 3 stale memories selected");
assert_eq!(report.rows_reembedded, 3);
assert_eq!(report.rows_failed, 0);
assert_eq!(report.rows_gc_deleted, 0);
assert!(!report.dry_run);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let total: i64 = read
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(total, 6, "without --gc, old rows are retained");
let new_count: i64 = read
.query_row(
"SELECT COUNT(*) FROM embeddings WHERE embedder_id = ?",
rusqlite::params![new_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(new_count, 3);
let old_count: i64 = read
.query_row(
"SELECT COUNT(*) FROM embeddings WHERE embedder_id = ?",
rusqlite::params![old_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(old_count, 3);
}
#[test]
fn reembed_with_gc_drops_stale_rows() {
use crate::embedder::StubEmbedder;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ReembedScope;
use solo_core::Embedder;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let snap = tmp.path().to_path_buf();
let (old_id, new_id) = {
let conn = open_test_db_at(&path);
(
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-old".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-new".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
)
};
let _mids = seed_episodes_under_embedder(&path, &snap, old_id, &["a", "b"]);
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(4));
let embedder: Arc<dyn Embedder> = Arc::new(StubEmbedder::new("stub-new", "v1", 4));
let WriterSpawn { handle, join } = WriterActor::spawn_full_with_embedder(
conn,
stub,
snap.clone(),
new_id,
embedder,
);
let report = handle
.reembed(ReembedScope {
from: None,
dry_run: false,
gc: true,
})
.await
.expect("reembed dispatch");
assert_eq!(report.rows_seen, 2);
assert_eq!(report.rows_reembedded, 2);
assert_eq!(report.rows_gc_deleted, 2, "two stale rows DELETEd");
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let total: i64 = read
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(total, 2, "only the new rows remain");
let only_current: i64 = read
.query_row(
"SELECT COUNT(DISTINCT embedder_id) FROM embeddings",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(only_current, 1);
}
#[test]
fn reembed_dry_run_writes_nothing() {
use crate::embedder::StubEmbedder;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ReembedScope;
use solo_core::Embedder;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let snap = tmp.path().to_path_buf();
let (old_id, new_id) = {
let conn = open_test_db_at(&path);
(
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-old".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-new".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
)
};
let _mids = seed_episodes_under_embedder(&path, &snap, old_id, &["x", "y"]);
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(4));
let embedder: Arc<dyn Embedder> = Arc::new(StubEmbedder::new("stub-new", "v1", 4));
let WriterSpawn { handle, join } = WriterActor::spawn_full_with_embedder(
conn,
stub,
snap.clone(),
new_id,
embedder,
);
let report = handle
.reembed(ReembedScope {
from: None,
dry_run: true,
gc: false,
})
.await
.expect("reembed dispatch");
assert_eq!(report.rows_seen, 2);
assert_eq!(report.rows_reembedded, 0, "dry-run writes nothing");
assert!(report.dry_run);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let total: i64 = read
.query_row("SELECT COUNT(*) FROM embeddings", [], |r| r.get(0))
.unwrap();
assert_eq!(total, 2);
let with_new: i64 = read
.query_row(
"SELECT COUNT(*) FROM embeddings WHERE embedder_id = ?",
rusqlite::params![new_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(with_new, 0);
}
#[test]
fn reembed_is_idempotent_when_run_twice() {
use crate::embedder::StubEmbedder;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ReembedScope;
use solo_core::Embedder;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let snap = tmp.path().to_path_buf();
let (old_id, new_id) = {
let conn = open_test_db_at(&path);
(
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-old".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub-new".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap(),
)
};
let _mids = seed_episodes_under_embedder(&path, &snap, old_id, &["only"]);
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(4));
let embedder: Arc<dyn Embedder> = Arc::new(StubEmbedder::new("stub-new", "v1", 4));
let WriterSpawn { handle, join } = WriterActor::spawn_full_with_embedder(
conn,
stub,
snap.clone(),
new_id,
embedder,
);
let r1 = handle.reembed(ReembedScope { gc: true, ..Default::default() }).await.unwrap();
assert_eq!(r1.rows_reembedded, 1);
let r2 = handle.reembed(ReembedScope { gc: true, ..Default::default() }).await.unwrap();
assert_eq!(r2.rows_seen, 0, "no candidates remain after first pass");
assert_eq!(r2.rows_reembedded, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
fn ep_at(ts_ms: i64, content: &str) -> solo_core::Episode {
solo_core::Episode {
memory_id: solo_core::MemoryId::new(),
ts_ms,
source_type: "user_message".into(),
source_id: None,
content: content.into(),
encoding_context: solo_core::EncodingContext::default(),
provenance: None,
confidence: solo_core::Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: solo_core::Tier::Hot,
}
}
fn unit_emb(dim: usize, components: &[(usize, f32)]) -> Embedding {
let mut v = vec![0.0f32; dim];
for &(i, x) in components {
v[i] = x;
}
let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in v.iter_mut() {
*x /= norm;
}
}
Embedding {
dtype: EmbeddingDtype::F32,
dim,
data: bytemuck::cast_slice(&v).to_vec(),
}
}
#[test]
fn consolidate_clusters_two_themes_into_two_persisted_clusters() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let snap = tmp.path().to_path_buf();
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full(conn, stub.clone(), snap, embedder_id);
let runtime = rt_multi(2);
runtime.block_on(async {
let day_a = 1_700_000_000_000i64;
let inputs = [
(ep_at(day_a, "a1"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(day_a + 1000, "a2"), unit_emb(dim, &[(0, 0.99), (1, 0.01)])),
(ep_at(day_a + 2000, "a3"), unit_emb(dim, &[(0, 0.98), (1, 0.02)])),
(ep_at(day_a + 3000, "b1"), unit_emb(dim, &[(2, 1.0)])),
(ep_at(day_a + 4000, "b2"), unit_emb(dim, &[(2, 0.99), (3, 0.01)])),
(ep_at(day_a + 5000, "b3"), unit_emb(dim, &[(2, 0.98), (3, 0.02)])),
];
for (ep, emb) in &inputs {
handle
.remember(ep.clone(), emb.clone())
.await
.expect("remember");
}
let report = handle
.consolidate(ConsolidationScope::default())
.await
.expect("consolidate");
assert_eq!(report.episodes_seen, 6);
assert_eq!(report.clusters_built, 2);
assert_eq!(report.episodes_clustered, 6);
assert_eq!(report.abstractions_built, 0);
assert_eq!(report.contradictions_found, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 2);
let n_links: i64 = read
.query_row("SELECT COUNT(*) FROM cluster_episodes", [], |r| r.get(0))
.unwrap();
assert_eq!(n_links, 6);
let with_centroid: i64 = read
.query_row(
"SELECT COUNT(*) FROM clusters WHERE centroid IS NOT NULL",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(with_centroid, 2);
}
#[test]
fn consolidate_no_op_when_no_episodes() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: 4,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join } =
WriterActor::spawn_full(conn, stub, tmp.path().to_path_buf(), embedder_id);
let runtime = rt_multi(1);
runtime.block_on(async {
let report = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(report.episodes_seen, 0);
assert_eq!(report.clusters_built, 0);
assert_eq!(report.episodes_clustered, 0);
});
drop(handle);
join.join().unwrap();
}
#[test]
fn consolidate_with_steward_persists_abstractions() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let runtime = rt_multi(2);
let canned = r#"{
"content": "Three abstract events about widgets.",
"confidence": 0.8,
"triples": [
{ "subject_id": "Widget", "predicate": "is", "object_id": "thing", "object_kind": "entity" },
{ "subject_id": "Widget", "predicate": "color", "object_id": "blue", "object_kind": "literal" }
]
}"#;
let stub = Arc::new(StubLlmClient::with_canned("stub-llm", canned));
let steward_for_writer = Arc::new(Steward::new(stub, StewardConfig::default()));
let steward_for_assert = steward_for_writer.clone();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward_for_writer),
);
let day_a = 1_700_000_000_000i64;
let inputs = [
(ep_at(day_a, "abstract-1"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(day_a + 1000, "abstract-2"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(day_a + 2000, "abstract-3"), unit_emb(dim, &[(0, 1.0)])),
];
for (ep, emb) in &inputs {
handle.remember(ep.clone(), emb.clone()).await.unwrap();
}
let report = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(report.clusters_built, 1);
assert_eq!(
report.abstractions_built, 1,
"stub abstract_cluster must have produced one row"
);
assert_eq!(
report.triples_built, 2,
"the canned response declared 2 triples; both must persist"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1);
let n_abs: i64 = read
.query_row("SELECT COUNT(*) FROM semantic_abstractions", [], |r| r.get(0))
.unwrap();
assert_eq!(n_abs, 1);
let abs_cluster_id: String = read
.query_row(
"SELECT cluster_id FROM semantic_abstractions LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
let cluster_id_in_table: String = read
.query_row("SELECT cluster_id FROM clusters LIMIT 1", [], |r| r.get(0))
.unwrap();
assert_eq!(abs_cluster_id, cluster_id_in_table);
let prov_json: String = read
.query_row(
"SELECT provenance_json FROM semantic_abstractions LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
let prov: serde_json::Value = serde_json::from_str(&prov_json).unwrap();
assert_eq!(prov["derivation"], "consolidation");
assert_eq!(prov["by"], "stub-llm");
let n_triples: i64 = read
.query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
.unwrap();
assert_eq!(n_triples, 2);
let object_kinds: Vec<String> = {
let mut stmt = read
.prepare("SELECT object_kind FROM triples ORDER BY rowid")
.unwrap();
let rows = stmt
.query_map([], |r| r.get::<_, String>(0))
.unwrap();
rows.collect::<rusqlite::Result<Vec<_>>>().unwrap()
};
assert_eq!(object_kinds, vec!["entity".to_string(), "literal".to_string()]);
let _ = steward_for_assert; }
#[test]
fn consolidate_with_steward_persists_contradictions_across_runs() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let stub =
Arc::new(StubLlmClient::default_stub().pretend_real_llm(true));
stub.push_canned(
r#"{
"content": "Sam settled in Paris.",
"confidence": 0.9,
"triples": [
{ "subject_id": "Sam", "predicate": "lives_in",
"object_id": "Paris", "object_kind": "entity" }
]
}"#,
);
stub.push_canned(
r#"{
"content": "Sam moved to Berlin.",
"confidence": 0.9,
"triples": [
{ "subject_id": "Sam", "predicate": "lives_in",
"object_id": "Berlin", "object_kind": "entity" }
]
}"#,
);
stub.push_canned(
r#"{
"is_contradiction": true,
"kind": "overlapping_single_valued_predicate",
"explanation": "Sam can't live in both Paris and Berlin at the same time."
}"#,
);
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward),
);
let day_a = 1_700_000_000_000i64;
for i in 0..3 {
handle
.remember(
ep_at(day_a + i * 1000, "sam-paris"),
unit_emb(dim, &[(0, 1.0)]),
)
.await
.unwrap();
}
let r1 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r1.clusters_built, 1);
assert_eq!(r1.abstractions_built, 1);
assert_eq!(r1.triples_built, 1);
assert_eq!(
r1.contradictions_found, 0,
"first run: only one triple, no pair to contradict"
);
let day_b = day_a + 86_400_000 * 2; for i in 0..3 {
handle
.remember(
ep_at(day_b + i * 1000, "sam-berlin"),
unit_emb(dim, &[(1, 1.0)]),
)
.await
.unwrap();
}
let r2 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r2.clusters_built, 1);
assert_eq!(r2.abstractions_built, 1);
assert_eq!(r2.triples_built, 1);
assert_eq!(
r2.contradictions_found, 1,
"run 2's Berlin triple contradicts run 1's Paris triple"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_contras: i64 = read
.query_row("SELECT COUNT(*) FROM contradictions", [], |r| r.get(0))
.unwrap();
assert_eq!(n_contras, 1);
let (a_id, b_id, kind, expl): (String, String, String, String) = read
.query_row(
"SELECT a_memory_id, b_memory_id, kind, explanation FROM contradictions LIMIT 1",
[],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?, r.get(3)?)),
)
.unwrap();
assert!(a_id < b_id, "expected a < b after normalisation");
assert_eq!(kind, "overlapping_single_valued_predicate");
assert!(expl.contains("Paris") || expl.contains("Berlin"));
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let s2 = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(s2),
);
let r3 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r3.episodes_seen, 0, "no new candidates");
assert_eq!(r3.contradictions_found, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_contras: i64 = read
.query_row("SELECT COUNT(*) FROM contradictions", [], |r| r.get(0))
.unwrap();
assert_eq!(n_contras, 1);
}
#[test]
fn contradiction_sweep_skipped_when_no_llm_client() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let stub = Arc::new(StubLlmClient::default_stub());
stub.push_canned(
r#"{
"content": "Sam settled in Paris.",
"confidence": 0.9,
"triples": [
{ "subject_id": "Sam", "predicate": "lives_in",
"object_id": "Paris", "object_kind": "entity" }
]
}"#,
);
stub.push_canned(
r#"{
"content": "Sam moved to Berlin.",
"confidence": 0.9,
"triples": [
{ "subject_id": "Sam", "predicate": "lives_in",
"object_id": "Berlin", "object_kind": "entity" }
]
}"#,
);
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
assert!(
!steward.has_llm(),
"default stub must report has_llm() == false; without the gate this test reduces to the existing cross-runs test"
);
let runtime = rt_multi(2);
let call_count_before_runs = stub.call_count();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward),
);
let day_a = 1_700_000_000_000i64;
for i in 0..3 {
handle
.remember(
ep_at(day_a + i * 1000, "sam-paris"),
unit_emb(dim, &[(0, 1.0)]),
)
.await
.unwrap();
}
let r1 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r1.abstractions_built, 1, "abstraction step still runs");
assert_eq!(r1.triples_built, 1);
assert_eq!(
r1.contradictions_found, 0,
"first run: gate-skip leaves contradictions_found at 0"
);
let day_b = day_a + 86_400_000 * 2;
for i in 0..3 {
handle
.remember(
ep_at(day_b + i * 1000, "sam-berlin"),
unit_emb(dim, &[(1, 1.0)]),
)
.await
.unwrap();
}
let r2 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r2.abstractions_built, 1);
assert_eq!(r2.triples_built, 1);
assert_eq!(
r2.contradictions_found, 0,
"gate must skip the sweep — no contradiction can be flagged \
without a real LLM"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let calls_after = stub.call_count();
assert_eq!(
calls_after - call_count_before_runs,
2,
"stub should have exactly 2 calls (1 per abstraction); a 3rd would mean the contradiction judge ran despite the gate"
);
let read = open_test_db_at(&path);
let n_contras: i64 = read
.query_row("SELECT COUNT(*) FROM contradictions", [], |r| r.get(0))
.unwrap();
assert_eq!(
n_contras, 0,
"no contradictions persisted when sweep is gated off"
);
}
#[test]
fn consolidate_without_steward_skips_abstraction_step() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub,
tmp.path().to_path_buf(),
embedder_id,
);
let runtime = rt_multi(2);
runtime.block_on(async {
let day_a = 1_700_000_000_000i64;
for i in 0..3 {
handle
.remember(
ep_at(day_a + i * 1000, &format!("noabs-{i}")),
unit_emb(dim, &[(0, 1.0)]),
)
.await
.unwrap();
}
let report = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(report.clusters_built, 1);
assert_eq!(
report.abstractions_built, 0,
"no steward → abstraction step is a no-op"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_abs: i64 = read
.query_row("SELECT COUNT(*) FROM semantic_abstractions", [], |r| r.get(0))
.unwrap();
assert_eq!(n_abs, 0);
}
#[test]
fn consolidate_is_idempotent_on_repeated_runs() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub,
tmp.path().to_path_buf(),
embedder_id,
);
let runtime = rt_multi(2);
runtime.block_on(async {
let day_a = 1_700_000_000_000i64;
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_a + ts_offset * 1000, &format!("idem-{i}"));
handle
.remember(ep, unit_emb(dim, &[(0, 1.0)]))
.await
.unwrap();
}
let r1 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r1.clusters_built, 1);
assert_eq!(r1.episodes_clustered, 3);
let r2 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r2.episodes_seen, 0, "second pass: no candidates left");
assert_eq!(r2.clusters_built, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1, "no duplicate cluster from repeated run");
let n_links: i64 = read
.query_row("SELECT COUNT(*) FROM cluster_episodes", [], |r| r.get(0))
.unwrap();
assert_eq!(n_links, 3);
}
#[test]
fn consolidate_window_days_filters_old_episodes() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub,
tmp.path().to_path_buf(),
embedder_id,
);
let runtime = rt_multi(2);
runtime.block_on(async {
let now = chrono::Utc::now().timestamp_millis();
let recent = [
(ep_at(now - 1000, "r1"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(now - 2000, "r2"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(now - 3000, "r3"), unit_emb(dim, &[(0, 1.0)])),
];
let old_ts = now - 10 * 86_400_000;
let old = [
(ep_at(old_ts, "o1"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(old_ts + 1000, "o2"), unit_emb(dim, &[(0, 1.0)])),
(ep_at(old_ts + 2000, "o3"), unit_emb(dim, &[(0, 1.0)])),
];
for (ep, emb) in recent.iter().chain(old.iter()) {
handle
.remember(ep.clone(), emb.clone())
.await
.expect("remember");
}
let report = handle
.consolidate(ConsolidationScope {
window_days: Some(2),
force_merge: false,
})
.await
.unwrap();
assert_eq!(report.episodes_seen, 3, "old episodes excluded");
assert_eq!(report.clusters_built, 1);
assert_eq!(report.episodes_clustered, 3);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
#[test]
fn consolidate_cross_run_absorb_folds_into_existing_cluster() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub.clone(),
tmp.path().to_path_buf(),
embedder_id,
);
let day_a = 1_700_000_000_000i64;
let runtime = rt_multi(2);
runtime.block_on(async {
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_a + ts_offset * 1000, &format!("pa{i}"));
handle
.remember(ep, unit_emb(dim, &[(0, 1.0)]))
.await
.unwrap();
}
let r = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r.clusters_built, 1, "run 1: one fresh cluster");
assert_eq!(r.clusters_absorbed, 0, "run 1: nothing to absorb into yet");
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let (run1_cluster_id, run1_centroid_bytes): (String, Vec<u8>) = {
let read = open_test_db_at(&path);
read.query_row(
"SELECT cluster_id, centroid FROM clusters",
[],
|r| Ok((r.get::<_, String>(0)?, r.get::<_, Vec<u8>>(1)?)),
)
.unwrap()
};
let conn2 = open_test_db_at(&path);
let stub2 = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle: handle2, join: join2 } = WriterActor::spawn_full(
conn2,
stub2.clone(),
tmp.path().to_path_buf(),
embedder_id,
);
let day_b = day_a + 86_400_000 * 5; let runtime2 = rt_multi(2);
runtime2.block_on(async {
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_b + ts_offset * 1000, &format!("pb{i}"));
handle2
.remember(ep, unit_emb(dim, &[(0, 0.99), (1, 0.01)]))
.await
.unwrap();
}
let r = handle2
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r.clusters_built, 0, "run 2: no fresh cluster row");
assert_eq!(r.clusters_absorbed, 1, "run 2: absorbed into run-1 cluster");
assert_eq!(r.episodes_clustered, 3);
drop(handle2);
tokio::task::spawn_blocking(move || join2.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1, "still exactly one cluster row");
let n_links: i64 = read
.query_row("SELECT COUNT(*) FROM cluster_episodes", [], |r| r.get(0))
.unwrap();
assert_eq!(n_links, 6, "all 6 episodes linked to one cluster");
let n_under_run1: i64 = read
.query_row(
"SELECT COUNT(*) FROM cluster_episodes WHERE cluster_id = ?1",
params![run1_cluster_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_under_run1, 6);
let new_centroid_bytes: Vec<u8> = read
.query_row(
"SELECT centroid FROM clusters WHERE cluster_id = ?1",
params![run1_cluster_id],
|r| r.get(0),
)
.unwrap();
assert_ne!(
new_centroid_bytes, run1_centroid_bytes,
"absorb refreshed centroid"
);
assert_eq!(
new_centroid_bytes.len(),
run1_centroid_bytes.len(),
"centroid dim unchanged"
);
}
#[test]
fn consolidate_cross_run_absorb_regenerates_abstraction() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let original = r#"{
"content": "Original pasta thoughts.",
"confidence": 0.7,
"triples": [
{ "subject_id": "user", "predicate": "likes", "object_id": "pasta", "object_kind": "literal" }
]
}"#;
let regenerated = r#"{
"content": "Regenerated pasta thoughts (now incl. day-B episodes).",
"confidence": 0.85,
"triples": [
{ "subject_id": "user", "predicate": "likes", "object_id": "pasta", "object_kind": "literal" },
{ "subject_id": "user", "predicate": "frequency", "object_id": "weekly", "object_kind": "literal" }
]
}"#;
let stub = Arc::new(StubLlmClient::with_canned("stub-llm", original));
stub.push_canned(regenerated);
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let day_a = 1_700_000_000_000i64;
let day_b = day_a + 86_400_000 * 5;
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward.clone()),
);
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_a + ts_offset * 1000, &format!("pa{i}"));
handle
.remember(ep, unit_emb(dim, &[(0, 1.0)]))
.await
.unwrap();
}
let r1 = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r1.clusters_built, 1);
assert_eq!(r1.abstractions_built, 1, "run 1: original abstraction");
assert_eq!(r1.abstractions_regenerated, 0, "run 1: nothing to regen");
assert_eq!(r1.triples_built, 1);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
let conn2 = open_test_db_at(&path);
let stub_idx2 = Arc::new(StubVectorIndex::new(dim));
let embedder2: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle: handle2, join: join2 } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn2,
stub_idx2,
tmp.path().to_path_buf(),
embedder_id,
embedder2,
Some(steward.clone()),
);
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_b + ts_offset * 1000, &format!("pb{i}"));
handle2
.remember(ep, unit_emb(dim, &[(0, 0.99), (1, 0.01)]))
.await
.unwrap();
}
let r2 = handle2
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r2.clusters_built, 0, "run 2: absorbed, no fresh cluster row");
assert_eq!(r2.clusters_absorbed, 1);
assert_eq!(r2.abstractions_built, 0, "run 2: in-run loop skips absorbed");
assert_eq!(
r2.abstractions_regenerated, 1,
"run 2: regen pass refreshes the absorbed-into existing cluster"
);
assert_eq!(r2.triples_built, 2, "regen produced 2 fresh triples");
drop(handle2);
tokio::task::spawn_blocking(move || join2.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1, "still one cluster row");
let abs_count: i64 = read
.query_row("SELECT COUNT(*) FROM semantic_abstractions", [], |r| r.get(0))
.unwrap();
assert_eq!(abs_count, 1, "exactly one abstraction (the regenerated one)");
let abs_content: String = read
.query_row(
"SELECT content FROM semantic_abstractions LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
assert!(
abs_content.contains("Regenerated"),
"abstraction content should be the regenerated one; got: {abs_content}"
);
let n_triples: i64 = read
.query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
.unwrap();
assert_eq!(n_triples, 2);
let n_with_cluster: i64 = read
.query_row(
"SELECT COUNT(*) FROM triples WHERE cluster_id IS NOT NULL",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_with_cluster, 2);
let triple_predicates: Vec<String> = {
let mut stmt = read
.prepare("SELECT predicate FROM triples ORDER BY predicate")
.unwrap();
stmt.query_map([], |r| r.get::<_, String>(0))
.unwrap()
.map(|r| r.unwrap())
.collect()
};
assert_eq!(triple_predicates, vec!["frequency".to_string(), "likes".to_string()]);
}
#[test]
fn consolidate_existing_vs_existing_merge_coalesces_drifted_clusters() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_a_id = "00000000-0000-0000-0000-0000000000aa";
let cluster_b_id = "00000000-0000-0000-0000-0000000000bb";
let now_ms = chrono::Utc::now().timestamp_millis();
fn emb_bytes(dim: usize, components: &[(usize, f32)]) -> Vec<u8> {
let mut v = vec![0.0f32; dim];
for &(i, x) in components {
v[i] = x;
}
let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in v.iter_mut() {
*x /= norm;
}
}
bytemuck::cast_slice(&v).to_vec()
}
let centroid_a = emb_bytes(dim, &[(0, 1.0)]);
let centroid_b = emb_bytes(dim, &[(0, 0.99), (1, 0.01)]);
{
let conn = open_test_db_at(&path);
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_a_id, centroid_a, dim as i64, 0.95, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_b_id, centroid_b, dim as i64, 0.93, now_ms],
)
.unwrap();
for i in 0..5 {
let mid = format!("00000000-0000-0000-0000-00000000a{:03}", i);
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content, encoding_context_json, confidence, strength, salience, tier, created_at_ms, updated_at_ms) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms - (5 - i as i64) * 1000, format!("a-ep-{i}"), now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![mid, embedder_id, dim as i64, ¢roid_a, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_a_id, mid],
)
.unwrap();
}
for i in 0..3 {
let mid = format!("00000000-0000-0000-0000-00000000b{:03}", i);
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content, encoding_context_json, confidence, strength, salience, tier, created_at_ms, updated_at_ms) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms - (3 - i as i64) * 1000, format!("b-ep-{i}"), now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![mid, embedder_id, dim as i64, ¢roid_b, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_b_id, mid],
)
.unwrap();
}
let n_pre: i64 = conn
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_pre, 2);
}
let regen_response = r#"{
"content": "Merged cluster (drifted A+B coalesced).",
"confidence": 0.9,
"triples": []
}"#;
let stub = Arc::new(StubLlmClient::with_canned("stub-llm", regen_response));
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward.clone()),
);
let trigger_ep = ep_at(now_ms + 1000, "trigger");
handle
.remember(trigger_ep, unit_emb(dim, &[(3, 1.0)]))
.await
.unwrap();
let report = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(report.clusters_built, 0);
assert_eq!(report.clusters_absorbed, 0);
assert_eq!(
report.existing_clusters_merged, 1,
"expected one existing cluster absorbed into another"
);
assert_eq!(report.abstractions_regenerated, 1);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1, "loser cluster row dropped");
let surviving_id: String = read
.query_row("SELECT cluster_id FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(
surviving_id, cluster_a_id,
"A (most episodes) must be the survivor"
);
let n_links: i64 = read
.query_row(
"SELECT COUNT(*) FROM cluster_episodes WHERE cluster_id = ?",
params![cluster_a_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_links, 8, "all 8 episodes now linked under cluster_a");
let abs_content: String = read
.query_row(
"SELECT content FROM semantic_abstractions WHERE cluster_id = ?",
params![cluster_a_id],
|r| r.get(0),
)
.unwrap();
assert!(
abs_content.contains("Merged cluster"),
"regen abstraction expected; got: {abs_content}"
);
}
#[test]
fn consolidate_force_merge_fires_with_no_candidates() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_a_id = "00000000-0000-0000-0000-0000000000aa";
let cluster_b_id = "00000000-0000-0000-0000-0000000000bb";
let now_ms = chrono::Utc::now().timestamp_millis();
fn emb_bytes(dim: usize, components: &[(usize, f32)]) -> Vec<u8> {
let mut v = vec![0.0f32; dim];
for &(i, x) in components {
v[i] = x;
}
let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in v.iter_mut() {
*x /= norm;
}
}
bytemuck::cast_slice(&v).to_vec()
}
let centroid_a = emb_bytes(dim, &[(0, 1.0)]);
let centroid_b = emb_bytes(dim, &[(0, 0.99), (1, 0.01)]);
{
let conn = open_test_db_at(&path);
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_a_id, centroid_a, dim as i64, 0.95, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_b_id, centroid_b, dim as i64, 0.93, now_ms],
)
.unwrap();
for i in 0..5 {
let mid = format!("00000000-0000-0000-0000-00000000a{:03}", i);
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content, encoding_context_json, confidence, strength, salience, tier, created_at_ms, updated_at_ms) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms - (5 - i as i64) * 1000, format!("a-ep-{i}"), now_ms, now_ms],
).unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![mid, embedder_id, dim as i64, ¢roid_a, now_ms],
).unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_a_id, mid],
).unwrap();
}
for i in 0..3 {
let mid = format!("00000000-0000-0000-0000-00000000b{:03}", i);
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content, encoding_context_json, confidence, strength, salience, tier, created_at_ms, updated_at_ms) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms - (3 - i as i64) * 1000, format!("b-ep-{i}"), now_ms, now_ms],
).unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![mid, embedder_id, dim as i64, ¢roid_b, now_ms],
).unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_b_id, mid],
).unwrap();
}
}
let regen_response = r#"{
"content": "force-merged.",
"confidence": 0.9,
"triples": []
}"#;
let stub = Arc::new(StubLlmClient::with_canned("stub-llm", regen_response));
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward.clone()),
);
let report = handle
.consolidate(ConsolidationScope {
window_days: None,
force_merge: true,
})
.await
.unwrap();
assert_eq!(report.episodes_seen, 0, "no unclustered episodes to feed");
assert_eq!(report.clusters_built, 0);
assert_eq!(report.clusters_absorbed, 0);
assert_eq!(
report.existing_clusters_merged, 1,
"force_merge=true should still trigger existing-vs-existing merge"
);
assert_eq!(
report.abstractions_regenerated, 1,
"regen runs for the merge survivor"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 1);
let n_links: i64 = read
.query_row(
"SELECT COUNT(*) FROM cluster_episodes WHERE cluster_id = ?",
params![cluster_a_id],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_links, 8);
}
#[test]
fn consolidate_no_force_merge_skips_merge_on_empty_candidates() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_a_id = "00000000-0000-0000-0000-0000000000aa";
let cluster_b_id = "00000000-0000-0000-0000-0000000000bb";
let now_ms = chrono::Utc::now().timestamp_millis();
fn emb_bytes(dim: usize, components: &[(usize, f32)]) -> Vec<u8> {
let mut v = vec![0.0f32; dim];
for &(i, x) in components {
v[i] = x;
}
let norm = v.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in v.iter_mut() {
*x /= norm;
}
}
bytemuck::cast_slice(&v).to_vec()
}
let centroid_a = emb_bytes(dim, &[(0, 1.0)]);
let centroid_b = emb_bytes(dim, &[(0, 0.99), (1, 0.01)]);
{
let conn = open_test_db_at(&path);
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_a_id, centroid_a, dim as i64, 0.95, now_ms],
).unwrap();
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![cluster_b_id, centroid_b, dim as i64, 0.93, now_ms],
).unwrap();
for i in 0..3 {
for (cluster, prefix, centroid) in
&[(cluster_a_id, "a", ¢roid_a), (cluster_b_id, "b", ¢roid_b)]
{
let mid = format!("00000000-0000-0000-0000-00000000{prefix}{:03}", i);
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content, encoding_context_json, confidence, strength, salience, tier, created_at_ms, updated_at_ms) VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid, now_ms - (3 - i as i64) * 1000, format!("{prefix}-ep-{i}"), now_ms, now_ms],
).unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms) VALUES (?, ?, 'f32', ?, ?, ?)",
params![mid, embedder_id, dim as i64, *centroid, now_ms],
).unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster, mid],
).unwrap();
}
}
}
let stub = Arc::new(StubLlmClient::default_stub());
let steward = Arc::new(Steward::new(stub.clone(), StewardConfig::default()));
let runtime = rt_multi(2);
runtime.block_on(async {
let conn = open_test_db_at(&path);
let stub_idx = Arc::new(StubVectorIndex::new(dim));
let embedder: Arc<dyn solo_core::Embedder> =
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
stub_idx,
tmp.path().to_path_buf(),
embedder_id,
embedder,
Some(steward.clone()),
);
let report = handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(report.episodes_seen, 0);
assert_eq!(report.existing_clusters_merged, 0);
assert_eq!(report.abstractions_regenerated, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 2, "no merge → both clusters still present");
}
#[test]
fn consolidate_cross_run_no_absorb_when_themes_unrelated() {
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::writer::ConsolidationScope;
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let conn = open_test_db_at(&path);
let stub = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } = WriterActor::spawn_full(
conn,
stub.clone(),
tmp.path().to_path_buf(),
embedder_id,
);
let day_a = 1_700_000_000_000i64;
let runtime = rt_multi(2);
runtime.block_on(async {
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_a + ts_offset * 1000, &format!("pasta{i}"));
handle
.remember(ep, unit_emb(dim, &[(0, 1.0)]))
.await
.unwrap();
}
handle
.consolidate(ConsolidationScope::default())
.await
.unwrap();
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let conn2 = open_test_db_at(&path);
let stub2 = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle: handle2, join: join2 } = WriterActor::spawn_full(
conn2,
stub2.clone(),
tmp.path().to_path_buf(),
embedder_id,
);
let day_b = day_a + 86_400_000 * 5;
let runtime2 = rt_multi(2);
runtime2.block_on(async {
for (i, ts_offset) in (0..3i64).enumerate() {
let ep = ep_at(day_b + ts_offset * 1000, &format!("rust{i}"));
handle2
.remember(ep, unit_emb(dim, &[(2, 1.0)]))
.await
.unwrap();
}
let r = handle2
.consolidate(ConsolidationScope::default())
.await
.unwrap();
assert_eq!(r.clusters_built, 1, "fresh unrelated cluster persisted");
assert_eq!(r.clusters_absorbed, 0, "no absorb when themes are orthogonal");
drop(handle2);
tokio::task::spawn_blocking(move || join2.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_clusters: i64 = read
.query_row("SELECT COUNT(*) FROM clusters", [], |r| r.get(0))
.unwrap();
assert_eq!(n_clusters, 2);
let n_links: i64 = read
.query_row("SELECT COUNT(*) FROM cluster_episodes", [], |r| r.get(0))
.unwrap();
assert_eq!(n_links, 6);
}
#[test]
fn reembed_without_embedder_returns_clear_error() {
use crate::writer::ReembedScope;
let (conn, _tmp) = open_test_db();
let stub = Arc::new(StubVectorIndex::new(4));
let WriterSpawn { handle, join: _ } =
WriterActor::spawn_full(conn, stub, std::env::temp_dir(), 1);
let runtime = rt_multi(1);
let err = runtime
.block_on(async { handle.reembed(ReembedScope::default()).await })
.unwrap_err();
assert!(
err.to_string().contains("spawn_full_with_embedder"),
"expected guidance pointing at the right constructor; got: {err}"
);
}