use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use rusqlite::params;
use solo_core::{Cluster, Embedding, EmbeddingDtype, Episode, Error, MemoryId, Result, Tier};
use tokio::sync::Notify;
use crate::reader::ReaderPool;
use crate::writer::{AttachAbstractionBatchReport, WriteHandle};
#[derive(Debug)]
pub struct TriplesBatchSignal {
episodes_since_batch: AtomicU64,
notify: Notify,
trigger_episode_count: u64,
}
impl TriplesBatchSignal {
pub fn new(trigger_episode_count: u64) -> Self {
Self {
episodes_since_batch: AtomicU64::new(0),
notify: Notify::new(),
trigger_episode_count,
}
}
pub fn note_episode_remembered(&self) {
if self.trigger_episode_count == 0 {
return;
}
let new_count = self.episodes_since_batch.fetch_add(1, Ordering::Relaxed) + 1;
if new_count >= self.trigger_episode_count {
self.notify.notify_one();
}
}
pub fn reset(&self) {
self.episodes_since_batch.store(0, Ordering::Relaxed);
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn episodes_since_batch(&self) -> u64 {
self.episodes_since_batch.load(Ordering::Relaxed)
}
pub fn trigger_episode_count(&self) -> u64 {
self.trigger_episode_count
}
}
pub async fn run_triples_batch_tick(
reader: &ReaderPool,
write_handle: &WriteHandle,
steward_slot: &Arc<tokio::sync::RwLock<Option<Arc<solo_steward::Steward>>>>,
current_embedder_id: i64,
limit: usize,
per_cluster_timeout: Duration,
audit_principal: Option<String>,
) -> Result<Option<AttachAbstractionBatchReport>> {
let steward: Arc<solo_steward::Steward> = {
let guard = steward_slot.read().await;
match guard.as_ref() {
Some(s) => Arc::clone(s),
None => {
tracing::debug!(
"triples_batch: steward_slot empty; nothing to do this tick"
);
return Ok(None);
}
}
};
if !steward.has_llm() {
tracing::debug!(
"triples_batch: steward has no LLM client; skipping batch"
);
return Ok(None);
}
let started = Instant::now();
let clusters_with_eps =
fetch_clusters_without_abstractions(reader, current_embedder_id, limit)
.await?;
if clusters_with_eps.is_empty() {
tracing::debug!(
"triples_batch: no clusters need abstractions; skipping"
);
return Ok(None);
}
let cluster_count = clusters_with_eps.len();
let episode_count: usize = clusters_with_eps
.iter()
.map(|(_, eps)| eps.len())
.sum();
let outcome = steward
.extract_triples_batch(clusters_with_eps, per_cluster_timeout)
.await;
let deferred_count = outcome.deferred_count;
let items: Vec<(MemoryId, solo_core::SemanticAbstraction)> = outcome.abstractions;
let duration_ms = started.elapsed().as_millis().min(u128::from(u64::MAX))
as u64;
if items.is_empty() {
tracing::info!(
cluster_count,
episode_count,
duration_ms,
deferred_count,
"triples_batch: no abstractions to persist this tick \
(all clusters failed or were deferred); skipping \
AttachAbstractionBatch dispatch"
);
return Ok(None);
}
let report = write_handle
.attach_abstraction_batch(
items,
episode_count,
duration_ms,
deferred_count,
audit_principal,
)
.await?;
tracing::info!(
cluster_count,
episode_count,
abstractions_built = report.abstractions_built,
triples_extracted = report.triples_extracted,
clusters_failed = report.clusters_failed,
clusters_deferred = report.clusters_deferred,
duration_ms,
"triples_batch: tick complete"
);
Ok(Some(report))
}
pub async fn fetch_clusters_without_abstractions(
reader: &ReaderPool,
current_embedder_id: i64,
limit: usize,
) -> Result<Vec<(Cluster, Vec<Episode>)>> {
if limit == 0 {
return Ok(Vec::new());
}
let cluster_rows: Vec<ClusterRow> = reader
.interact(move |conn| {
let mut stmt = conn.prepare(
"SELECT c.cluster_id, c.centroid, c.centroid_dtype, c.centroid_dim, c.coherence
FROM clusters c
WHERE c.cluster_id NOT IN (SELECT cluster_id FROM semantic_abstractions)
ORDER BY c.rowid ASC
LIMIT ?1",
)?;
let iter = stmt.query_map(params![limit as i64], |row| {
Ok(ClusterRow {
cluster_id: row.get::<_, String>(0)?,
centroid: row.get::<_, Option<Vec<u8>>>(1)?,
centroid_dtype: row.get::<_, Option<String>>(2)?,
centroid_dim: row.get::<_, Option<i64>>(3)?,
coherence: row.get::<_, f64>(4)?,
})
})?;
let mut rows = Vec::new();
for r in iter {
rows.push(r?);
}
Ok(rows)
})
.await?;
let mut out: Vec<(Cluster, Vec<Episode>)> = Vec::with_capacity(cluster_rows.len());
for row in cluster_rows {
let cluster_id = match MemoryId::from_str(&row.cluster_id) {
Ok(id) => id,
Err(e) => {
tracing::warn!(
cluster_id = %row.cluster_id,
error = %e,
"fetch_clusters_without_abstractions: parse cluster_id failed; skipping"
);
continue;
}
};
let episodes = fetch_episodes_for_cluster(reader, current_embedder_id, cluster_id).await?;
if episodes.is_empty() {
tracing::warn!(
cluster_id = %cluster_id,
"fetch_clusters_without_abstractions: cluster has no member episodes; skipping"
);
continue;
}
let episode_ids: Vec<MemoryId> = episodes.iter().map(|e| e.memory_id).collect();
let centroid = build_centroid(
row.centroid.as_deref(),
row.centroid_dtype.as_deref(),
row.centroid_dim,
);
let cluster = Cluster {
cluster_id,
episode_ids,
centroid,
coherence: row.coherence as f32,
};
out.push((cluster, episodes));
}
Ok(out)
}
#[derive(Debug)]
struct ClusterRow {
cluster_id: String,
centroid: Option<Vec<u8>>,
centroid_dtype: Option<String>,
centroid_dim: Option<i64>,
coherence: f64,
}
fn build_centroid(
bytes: Option<&[u8]>,
dtype_str: Option<&str>,
dim: Option<i64>,
) -> Option<Embedding> {
let bytes = bytes?;
let dim = usize::try_from(dim.unwrap_or(0)).ok().filter(|d| *d > 0)?;
let dtype = match dtype_str {
Some("f32") => EmbeddingDtype::F32,
Some("f16") => EmbeddingDtype::F16,
Some("i8") => EmbeddingDtype::I8,
Some("binary") => EmbeddingDtype::Binary,
_ => return None,
};
Some(Embedding {
data: bytes.to_vec(),
dim,
dtype,
})
}
async fn fetch_episodes_for_cluster(
reader: &ReaderPool,
current_embedder_id: i64,
cluster_id: MemoryId,
) -> Result<Vec<Episode>> {
let cluster_id_str = cluster_id.to_string();
reader
.interact(move |conn| {
let mut stmt = conn.prepare(
"SELECT e.memory_id, e.ts_ms, e.source_type, e.content,
e.confidence, e.strength, e.salience
FROM episodes e
JOIN cluster_episodes ce ON ce.memory_id = e.memory_id
WHERE ce.cluster_id = ?1
AND e.status = 'active'
ORDER BY e.ts_ms ASC",
)?;
let iter = stmt.query_map(params![cluster_id_str], |row| {
let memory_id_s: String = row.get(0)?;
let ts_ms: i64 = row.get(1)?;
let source_type: String = row.get(2)?;
let content: String = row.get(3)?;
let confidence: f64 = row.get(4)?;
let strength: f64 = row.get(5)?;
let salience: f64 = row.get(6)?;
Ok(EpisodeRow {
memory_id_s,
ts_ms,
source_type,
content,
confidence,
strength,
salience,
})
})?;
let mut rows = Vec::new();
for r in iter {
rows.push(r?);
}
Ok(rows)
})
.await
.and_then(|rows| {
let mut out: Vec<Episode> = Vec::with_capacity(rows.len());
for r in rows {
let memory_id = MemoryId::from_str(&r.memory_id_s).map_err(|e| {
Error::storage(format!(
"fetch_episodes_for_cluster: parse memory_id {}: {e}",
r.memory_id_s
))
})?;
out.push(Episode {
memory_id,
ts_ms: r.ts_ms,
source_type: r.source_type,
content: r.content,
encoding_context: Default::default(),
provenance: None,
confidence: solo_core::Confidence::new(r.confidence as f32)
.unwrap_or(solo_core::Confidence(0.5)),
strength: r.strength as f32,
salience: r.salience as f32,
tier: Tier::Hot,
source_id: None,
});
}
let _ = current_embedder_id; Ok(out)
})
}
#[derive(Debug)]
struct EpisodeRow {
memory_id_s: String,
ts_ms: i64,
source_type: String,
content: String,
confidence: f64,
strength: f64,
salience: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::audit::AuditOperation;
use crate::embedder_registry::{EmbedderIdentity, get_or_insert_embedder_id};
use crate::test_support::{StubVectorIndex, open_test_db_at};
use crate::writer::{WriterActor, WriterSpawn};
use solo_core::{Confidence, SemanticAbstraction};
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tokio::sync::RwLock as AsyncRwLock;
#[test]
fn count_based_trigger_fires_when_threshold_reached() {
let runtime = rt_multi();
runtime.block_on(async {
let signal = Arc::new(TriplesBatchSignal::new(5));
let writer_signal = signal.clone();
let writer = tokio::spawn(async move {
for _ in 0..5 {
writer_signal.note_episode_remembered();
tokio::task::yield_now().await;
}
});
let result = tokio::time::timeout(
StdDuration::from_secs(2),
signal.notified(),
)
.await;
writer.await.unwrap();
assert!(
result.is_ok(),
"count-based trigger MUST fire once the writer reaches \
the threshold; instead the test timed out (the M1 \
regression — `trigger_episode_count` wasn't actually \
wired and the daemon's select arm never received any \
ping)."
);
assert!(
signal.episodes_since_batch() >= 5,
"counter must have reached the threshold; got {}",
signal.episodes_since_batch()
);
});
}
#[test]
fn time_and_count_triggers_dont_double_run() {
let runtime = rt_multi();
runtime.block_on(async {
let signal = Arc::new(TriplesBatchSignal::new(1));
signal.note_episode_remembered();
let first = tokio::time::timeout(
StdDuration::from_millis(500),
signal.notified(),
)
.await;
assert!(first.is_ok(), "first notification must arrive");
signal.reset();
assert_eq!(signal.episodes_since_batch(), 0);
let second = tokio::time::timeout(
StdDuration::from_millis(200),
signal.notified(),
)
.await;
assert!(
second.is_err(),
"after reset, the count-based arm MUST wait for \
fresh accumulation; if it fired immediately the \
daemon would batch-run twice for one count crossing"
);
});
}
#[test]
fn count_resets_after_batch() {
let runtime = rt_multi();
runtime.block_on(async {
let signal = Arc::new(TriplesBatchSignal::new(3));
for _ in 0..3 {
signal.note_episode_remembered();
}
let first = tokio::time::timeout(
StdDuration::from_millis(500),
signal.notified(),
)
.await;
assert!(first.is_ok());
signal.reset();
assert_eq!(signal.episodes_since_batch(), 0);
signal.note_episode_remembered();
signal.note_episode_remembered();
assert_eq!(signal.episodes_since_batch(), 2);
let mid = tokio::time::timeout(
StdDuration::from_millis(150),
signal.notified(),
)
.await;
assert!(
mid.is_err(),
"below-threshold increments must NOT fire the count arm"
);
signal.note_episode_remembered();
let second = tokio::time::timeout(
StdDuration::from_millis(500),
signal.notified(),
)
.await;
assert!(
second.is_ok(),
"post-reset, the threshold must fire on its own count \
(the counter is per-batch, not cumulative across resets)"
);
});
}
#[test]
fn count_based_trigger_disabled_when_threshold_is_zero() {
let runtime = rt_multi();
runtime.block_on(async {
let signal = Arc::new(TriplesBatchSignal::new(0));
for _ in 0..100 {
signal.note_episode_remembered();
}
assert_eq!(
signal.episodes_since_batch(),
0,
"with threshold==0, the increment must be a no-op"
);
let attempt = tokio::time::timeout(
StdDuration::from_millis(200),
signal.notified(),
)
.await;
assert!(
attempt.is_err(),
"threshold==0 must NOT fire the count arm — the daemon \
falls back to the time-based path only"
);
});
}
fn rt_multi() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
}
fn make_steward_with_canned_abstraction() -> Arc<Steward> {
let canned = r#"{
"content": "Test abstraction",
"confidence": 0.7,
"triples": [
{ "subject_id": "user", "predicate": "likes",
"object_id": "pasta", "object_kind": "literal" }
]
}"#;
let stub = StubLlmClient::with_canned("stub-llm", canned)
.pretend_real_llm(true);
Arc::new(Steward::new(Arc::new(stub), StewardConfig::default()))
}
fn seed_cluster_with_episodes(
path: &std::path::Path,
embedder_id: i64,
cluster_id: MemoryId,
n_episodes: usize,
) {
let conn = open_test_db_at(path);
let now_ms = chrono::Utc::now().timestamp_millis();
let centroid = bytemuck::cast_slice(&[1.0f32, 0.0, 0.0, 0.0]).to_vec();
conn.execute(
"INSERT INTO clusters (cluster_id, centroid, centroid_dtype, centroid_dim, coherence, created_at_ms)
VALUES (?, ?, 'f32', 4, 0.9, ?)",
params![cluster_id.to_string(), centroid, now_ms],
)
.unwrap();
for i in 0..n_episodes {
let mid = MemoryId::new();
conn.execute(
"INSERT INTO episodes (memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms)
VALUES (?, ?, 'user_message', ?, '{}', 0.9, 0.5, 0.5, 'hot', ?, ?)",
params![mid.to_string(), now_ms + i as i64 * 1000, format!("ep-{i}"), now_ms, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO embeddings (memory_id, embedder_id, dtype, dim, vector, created_at_ms)
VALUES (?, ?, 'f32', 4, ?, ?)",
params![mid.to_string(), embedder_id, centroid, now_ms],
)
.unwrap();
conn.execute(
"INSERT INTO cluster_episodes (cluster_id, memory_id) VALUES (?, ?)",
params![cluster_id.to_string(), mid.to_string()],
)
.unwrap();
}
}
#[test]
fn run_triples_batch_tick_persists_abstractions_and_emits_audit_row() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_id = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, cluster_id, 3);
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let reader_pool = ReaderPool::new(&path, None, hnsw.clone()).unwrap();
let steward = make_steward_with_canned_abstraction();
let slot = Arc::new(AsyncRwLock::new(Some(steward)));
let report = run_triples_batch_tick(
&reader_pool,
&handle,
&slot,
embedder_id,
100,
StdDuration::from_secs(60),
None,
)
.await
.expect("tick ok")
.expect("Some(report) — we have one un-abstracted cluster");
assert_eq!(report.abstractions_built, 1);
assert_eq!(report.triples_extracted, 1);
assert_eq!(report.clusters_failed, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_abs: i64 = read
.query_row(
"SELECT COUNT(*) FROM semantic_abstractions",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_abs, 1);
let n_triples: i64 = read
.query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
.unwrap();
assert_eq!(n_triples, 1);
let (op_str, details_str): (String, Option<String>) = read
.query_row(
"SELECT operation, details_json FROM audit_events
WHERE operation = ?
ORDER BY ts_ms DESC, audit_id DESC LIMIT 1",
params![AuditOperation::MemoryTriplesExtract.as_str()],
|r| Ok((r.get(0)?, r.get(1)?)),
)
.unwrap();
assert_eq!(op_str, "memory.triples_extract");
let details: serde_json::Value =
serde_json::from_str(&details_str.unwrap()).unwrap();
assert_eq!(details["abstractions_built"], 1);
assert_eq!(details["triples_extracted"], 1);
assert_eq!(details["cluster_count"], 1);
assert_eq!(details["clusters_failed"], 0);
assert_eq!(details["clusters_deferred"], 0);
assert_eq!(details["episode_count"], 3);
assert!(details["duration_ms"].is_number());
}
#[test]
fn run_triples_batch_tick_returns_none_when_slot_empty() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let reader_pool = ReaderPool::new(&path, None, hnsw.clone()).unwrap();
let slot: Arc<AsyncRwLock<Option<Arc<Steward>>>> =
Arc::new(AsyncRwLock::new(None));
let report = run_triples_batch_tick(
&reader_pool,
&handle,
&slot,
embedder_id,
100,
StdDuration::from_secs(60),
None,
)
.await
.expect("tick ok");
assert!(
report.is_none(),
"empty slot must short-circuit without dispatching a batch"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
#[test]
fn run_triples_batch_tick_returns_none_when_no_clusters_need_abstraction() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_id = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, cluster_id, 3);
{
let conn = open_test_db_at(&path);
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO semantic_abstractions
(abstraction_id, cluster_id, content, provenance_json,
confidence, created_at_ms)
VALUES (?, ?, ?, '{}', 0.8, ?)",
params![
MemoryId::new().to_string(),
cluster_id.to_string(),
"pre-existing",
now_ms,
],
)
.unwrap();
}
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let reader_pool = ReaderPool::new(&path, None, hnsw.clone()).unwrap();
let steward = make_steward_with_canned_abstraction();
let slot = Arc::new(AsyncRwLock::new(Some(steward)));
let report = run_triples_batch_tick(
&reader_pool,
&handle,
&slot,
embedder_id,
100,
StdDuration::from_secs(60),
None,
)
.await
.expect("tick ok");
assert!(
report.is_none(),
"no clusters need abstraction → tick is a no-op"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
#[test]
fn run_triples_batch_tick_skips_when_steward_has_no_llm() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let cluster_id = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, cluster_id, 3);
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let reader_pool = ReaderPool::new(&path, None, hnsw.clone()).unwrap();
let stub_no_llm = Arc::new(StubLlmClient::default_stub());
let steward = Arc::new(Steward::new(stub_no_llm, StewardConfig::default()));
let slot = Arc::new(AsyncRwLock::new(Some(steward)));
let report = run_triples_batch_tick(
&reader_pool,
&handle,
&slot,
embedder_id,
100,
StdDuration::from_secs(60),
None,
)
.await
.expect("tick ok");
assert!(
report.is_none(),
"stub-only steward → tick is a no-op"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
#[test]
fn attach_abstraction_batch_persists_multi_cluster_batch() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let c1 = MemoryId::new();
let c2 = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, c1, 2);
seed_cluster_with_episodes(&path, embedder_id, c2, 3);
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let abs1 = mk_abs(c1, 2);
let abs2 = mk_abs(c2, 1);
let items = vec![(c1, abs1), (c2, abs2)];
let report = handle
.attach_abstraction_batch(items, 5, 42, 0, Some("op".into()))
.await
.expect("batch ok");
assert_eq!(report.abstractions_built, 2);
assert_eq!(report.triples_extracted, 3);
assert_eq!(report.clusters_failed, 0);
assert_eq!(report.clusters_deferred, 0);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_abs: i64 = read
.query_row(
"SELECT COUNT(*) FROM semantic_abstractions",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_abs, 2);
let n_triples: i64 = read
.query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
.unwrap();
assert_eq!(n_triples, 3);
let (audit_count,): (i64,) = read
.query_row(
"SELECT COUNT(*) FROM audit_events WHERE operation = 'memory.triples_extract'",
[],
|r| Ok((r.get(0)?,)),
)
.unwrap();
assert_eq!(
audit_count, 1,
"one batch must emit exactly one MemoryTriplesExtract audit row"
);
}
#[test]
fn attach_abstraction_batch_is_idempotent_on_re_run() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let c = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, c, 2);
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let abs_v1 = mk_abs(c, 1);
let r1 = handle
.attach_abstraction_batch(vec![(c, abs_v1)], 2, 10, 0, None)
.await
.expect("v1 batch ok");
assert_eq!(r1.abstractions_built, 1);
assert_eq!(r1.triples_extracted, 1);
let abs_v2 = mk_abs(c, 3);
let r2 = handle
.attach_abstraction_batch(vec![(c, abs_v2)], 2, 10, 0, None)
.await
.expect("v2 batch ok");
assert_eq!(r2.abstractions_built, 1);
assert_eq!(r2.triples_extracted, 3);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let n_abs: i64 = read
.query_row(
"SELECT COUNT(*) FROM semantic_abstractions",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
n_abs, 1,
"second batch replaces the first cluster's abstraction; only one row remains"
);
let n_triples: i64 = read
.query_row("SELECT COUNT(*) FROM triples", [], |r| r.get(0))
.unwrap();
assert_eq!(
n_triples, 3,
"the second batch's 3 triples replaced the first batch's 1"
);
let (audit_count,): (i64,) = read
.query_row(
"SELECT COUNT(*) FROM audit_events WHERE operation = 'memory.triples_extract'",
[],
|r| Ok((r.get(0)?,)),
)
.unwrap();
assert_eq!(audit_count, 2);
}
#[test]
fn attach_abstraction_batch_rejects_mismatched_cluster_id() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let _embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
1,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let a_id = MemoryId::new();
let b_id = MemoryId::new();
let mut abs = mk_abs(b_id, 0);
abs.cluster_id = b_id;
let err = handle
.attach_abstraction_batch(vec![(a_id, abs)], 0, 0, 0, None)
.await
.expect_err("mismatch must error");
assert!(
format!("{err}").contains("cluster_id mismatch"),
"expected mismatch error; got: {err}"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
}
#[test]
fn attach_abstraction_batch_partial_failure_preserves_other_clusters() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let c1 = MemoryId::new();
let c2 = MemoryId::new();
let c3 = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, c1, 1);
seed_cluster_with_episodes(&path, embedder_id, c2, 1);
seed_cluster_with_episodes(&path, embedder_id, c3, 1);
let stale_abs_id = MemoryId::new();
{
let conn = open_test_db_at(&path);
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO semantic_abstractions
(abstraction_id, cluster_id, content, provenance_json,
confidence, created_at_ms)
VALUES (?, ?, 'STALE-CONTENT', '{}', 0.5, ?)",
params![stale_abs_id.to_string(), c2.to_string(), now_ms],
)
.unwrap();
}
let collision_triple_id = MemoryId::new();
{
let conn = open_test_db_at(&path);
let now_ms = chrono::Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO triples
(triple_id, subject_id, predicate, object_id, object_kind,
valid_from_ms, valid_to_ms, confidence, provenance_json,
created_at_ms, updated_at_ms, cluster_id, source_episode_id)
VALUES (?, ?, ?, ?, 'literal', 0, NULL, 0.5, '{}', ?, ?, ?, NULL)",
params![
collision_triple_id.to_string(),
"foreign-subj",
"foreign-pred",
"foreign-obj",
now_ms,
now_ms,
c3.to_string()
],
)
.unwrap();
}
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let abs1 = mk_abs(c1, 1);
let mut abs2 = mk_abs(c2, 1);
abs2.triples[0].triple_id = collision_triple_id;
let abs3 = mk_abs(c3, 1);
let report = handle
.attach_abstraction_batch(
vec![(c1, abs1), (c2, abs2), (c3, abs3)],
3,
100,
0,
None,
)
.await
.expect("batch must succeed at the outer-tx level");
assert_eq!(report.abstractions_built, 2,
"clusters 1+3 should both land their new abstractions");
assert_eq!(report.clusters_failed, 1,
"cluster 2's INSERT must fail and be booked as a failure");
assert_eq!(report.triples_extracted, 2,
"two successful clusters' triples (1 each)");
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let c2_abs_id: Option<String> = read
.query_row(
"SELECT abstraction_id FROM semantic_abstractions WHERE cluster_id = ?",
params![c2.to_string()],
|r| r.get(0),
)
.ok();
assert_eq!(
c2_abs_id.as_deref(),
Some(stale_abs_id.to_string().as_str()),
"cluster 2's pre-existing abstraction must be preserved \
(savepoint rollback undid the per-cluster DELETE)"
);
let n_c1: i64 = read
.query_row(
"SELECT COUNT(*) FROM semantic_abstractions WHERE cluster_id = ?",
params![c1.to_string()],
|r| r.get(0),
)
.unwrap();
let n_c3: i64 = read
.query_row(
"SELECT COUNT(*) FROM semantic_abstractions WHERE cluster_id = ?",
params![c3.to_string()],
|r| r.get(0),
)
.unwrap();
assert_eq!(n_c1, 1, "cluster 1 has a fresh abstraction");
assert_eq!(n_c3, 1, "cluster 3 has a fresh abstraction");
let n_triples_c3: i64 = read
.query_row(
"SELECT COUNT(*) FROM triples WHERE cluster_id = ?",
params![c3.to_string()],
|r| r.get(0),
)
.unwrap();
assert_eq!(
n_triples_c3, 1,
"cluster 3's successful run replaced the pre-seeded \
collision row with its own fresh triple"
);
let (audit_count,): (i64,) = read
.query_row(
"SELECT COUNT(*) FROM audit_events WHERE operation = 'memory.triples_extract'",
[],
|r| Ok((r.get(0)?,)),
)
.unwrap();
assert_eq!(audit_count, 1);
}
#[test]
fn triples_batch_audit_row_includes_deferred_count() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("test.db");
let dim = 4usize;
let embedder_id = {
let conn = open_test_db_at(&path);
get_or_insert_embedder_id(
&conn,
&EmbedderIdentity {
name: "stub".into(),
version: "v1".into(),
dim: dim as u32,
dtype: "f32".into(),
},
)
.unwrap()
};
let c1 = MemoryId::new();
seed_cluster_with_episodes(&path, embedder_id, c1, 2);
let runtime = rt_multi();
runtime.block_on(async {
let conn = open_test_db_at(&path);
let hnsw = Arc::new(StubVectorIndex::new(dim));
let WriterSpawn { handle, join } =
WriterActor::spawn_full_with_embedder_and_optional_steward(
conn,
hnsw.clone(),
tmp.path().to_path_buf(),
embedder_id,
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", dim)),
None,
);
let abs1 = mk_abs(c1, 1);
let report = handle
.attach_abstraction_batch(
vec![(c1, abs1)],
2,
100,
2,
None,
)
.await
.expect("batch ok");
assert_eq!(report.abstractions_built, 1);
assert_eq!(
report.clusters_deferred, 2,
"report must echo the caller's deferred count"
);
drop(handle);
tokio::task::spawn_blocking(move || join.join().unwrap())
.await
.unwrap();
});
let read = open_test_db_at(&path);
let details_str: Option<String> = read
.query_row(
"SELECT details_json FROM audit_events
WHERE operation = 'memory.triples_extract'
ORDER BY ts_ms DESC, audit_id DESC LIMIT 1",
[],
|r| r.get(0),
)
.unwrap();
let details: serde_json::Value =
serde_json::from_str(&details_str.unwrap()).unwrap();
assert_eq!(
details["clusters_deferred"], 2,
"the m5 deferred counter must surface in the audit row's \
details_json so operators see 'something is slow' in the \
audit log without grepping `tracing::warn!` lines"
);
assert_eq!(details["abstractions_built"], 1);
}
fn mk_abs(cluster_id: MemoryId, n_triples: usize) -> SemanticAbstraction {
use solo_core::{Provenance, Triple, TripleObjectKind};
let mut triples: Vec<Triple> = Vec::new();
for i in 0..n_triples {
triples.push(Triple {
triple_id: MemoryId::new(),
subject_id: "subject".into(),
predicate: format!("pred_{i}"),
object_id: format!("obj_{i}"),
object_kind: TripleObjectKind::Literal,
valid_from_ms: 0,
valid_to_ms: None,
confidence: Confidence::new(0.8).unwrap(),
provenance: Provenance {
derived_from: vec![],
derivation: "consolidation".into(),
by: "stub-llm".into(),
at_ms: 0,
},
});
}
SemanticAbstraction {
abstraction_id: MemoryId::new(),
cluster_id,
content: "test abs".into(),
confidence: Confidence::new(0.7).unwrap(),
triples,
provenance: Provenance {
derived_from: vec![],
derivation: "consolidation".into(),
by: "stub-llm".into(),
at_ms: 0,
},
}
}
}