#![cfg(test)]
use crate::config::EmbedderConfig;
use crate::init::{InitParams, init};
use crate::key_material::KeyMaterial;
use crate::tenants::{TenantRegistry, TenantRegistryParams};
use crate::vector_index::HnswParams;
use solo_core::{Embedder, TenantId};
use std::sync::Arc;
use zeroize::Zeroizing;
fn fresh_init_dir(passphrase: &str) -> (tempfile::TempDir, KeyMaterial) {
let tmp = tempfile::TempDir::new().unwrap();
let _ = init(InitParams {
data_dir: tmp.path().to_path_buf(),
passphrase: Zeroizing::new(passphrase.into()),
force: false,
embedder: EmbedderConfig {
name: "stub".into(),
version: "v1".into(),
dim: 32,
dtype: "f32".into(),
},
})
.unwrap();
let cfg = crate::config::SoloConfig::read(&tmp.path().join("solo.config.toml")).unwrap();
let key = KeyMaterial::derive(passphrase, &cfg.salt_bytes().unwrap()).unwrap();
(tmp, key)
}
fn stub_embedder() -> Arc<dyn Embedder> {
Arc::new(crate::embedder::StubEmbedder::new("stub", "v1", 32))
}
fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
}
fn make_registry(
data_dir: &std::path::Path,
key: &KeyMaterial,
runtime: &tokio::runtime::Runtime,
) -> Arc<TenantRegistry> {
let _ = runtime;
Arc::new(
TenantRegistry::open(TenantRegistryParams {
data_dir: data_dir.to_path_buf(),
key: key.clone(),
embedder: stub_embedder(),
hnsw_params: HnswParams::default(),
steward: None,
runtime_handle: Some(runtime.handle().clone()),
steward_factory: None,
triples_batch_signal: None,
})
.expect("open registry"),
)
}
#[test]
fn tenant_registry_open_lists_default_tenant() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let listed = registry.list_active().await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].tenant_id, TenantId::default_tenant());
});
}
#[test]
fn tenant_handle_lifecycle_open_remember_recall_reopen() {
use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
use chrono::Utc;
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let default_id = TenantId::default_tenant();
let mid = {
let h = registry.get_or_open(&default_id).await.unwrap();
let ep = Episode {
memory_id: MemoryId::new(),
ts_ms: Utc::now().timestamp_millis(),
source_type: "user_message".into(),
source_id: None,
content: "lifecycle test".into(),
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: Tier::Hot,
};
let emb = stub_embedder().embed("lifecycle test").await.unwrap();
h.write().remember(ep, emb).await.unwrap()
};
let mid_str = mid.to_string();
let h = registry.get_or_open(&default_id).await.unwrap();
let content: String = h
.read()
.interact(move |conn| {
conn.query_row(
"SELECT content FROM episodes WHERE memory_id = ?",
[&mid_str],
|r| r.get(0),
)
})
.await
.unwrap();
assert_eq!(content, "lifecycle test");
});
}
#[test]
fn tenant_registry_lazy_load_returns_cached_arc() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let id = TenantId::default_tenant();
let first = registry.get_or_open(&id).await.unwrap();
let second = registry.get_or_open(&id).await.unwrap();
assert!(Arc::ptr_eq(&first, &second));
});
}
#[test]
fn tenant_registry_concurrent_first_access_single_open() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let id = TenantId::default_tenant();
let mut handles = Vec::new();
for _ in 0..32 {
let r = registry.clone();
let id = id.clone();
handles.push(tokio::spawn(async move { r.get_or_open(&id).await.unwrap() }));
}
let mut results: Vec<_> = Vec::new();
for h in handles {
results.push(h.await.unwrap());
}
for r in &results[1..] {
assert!(Arc::ptr_eq(&results[0], r));
}
});
}
#[test]
fn tenant_registry_unknown_tenant_returns_not_found() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let other = TenantId::new("never-registered").unwrap();
match registry.get_or_open(&other).await {
Ok(_) => panic!("expected NotFound, got Ok"),
Err(e) => assert!(
matches!(e, solo_core::Error::NotFound(_)),
"expected NotFound, got {e:?}"
),
}
});
}
#[test]
fn tenant_registry_forget_handle_evicts_from_cache() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let id = TenantId::default_tenant();
let first = registry.get_or_open(&id).await.unwrap();
assert!(registry.is_open(&id).await);
let evicted = registry.forget_handle(&id).await;
assert!(evicted.is_some());
assert!(!registry.is_open(&id).await);
let second = registry.get_or_open(&id).await.unwrap();
assert!(!Arc::ptr_eq(&first, &second));
});
}
#[test]
fn tenant_id_validation_rejects_uppercase() {
let err = TenantId::new("UPPERCASE-IS-BAD").unwrap_err();
let msg = format!("{err}");
assert!(msg.to_lowercase().contains("invalid"), "got: {msg}");
}
#[test]
fn tenant_id_validation_rejects_too_long() {
let too_long = "a".repeat(65);
let err = TenantId::new(too_long).unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("too long") || msg.contains("64"), "got: {msg}");
}
#[test]
fn tenant_per_tenant_isolation_writes_dont_cross_over() {
use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
use chrono::Utc;
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let alpha = TenantId::new("alpha").unwrap();
let alpha_db_name = "alpha.db";
let alpha_db = tmp.path().join("tenants").join(alpha_db_name);
{
let mut conn = crate::init::open_sqlcipher(&alpha_db, &key).unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
}
registry
.with_index(|idx| {
idx.register(&alpha, alpha_db_name, Some("Alpha tenant")).unwrap();
})
.await;
let default_id = TenantId::default_tenant();
let h_default = registry.get_or_open(&default_id).await.unwrap();
let ep = Episode {
memory_id: MemoryId::new(),
ts_ms: Utc::now().timestamp_millis(),
source_type: "user_message".into(),
source_id: None,
content: "from default tenant".into(),
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: Tier::Hot,
};
let emb = stub_embedder().embed("from default tenant").await.unwrap();
h_default.write().remember(ep, emb).await.unwrap();
let h_alpha = registry.get_or_open(&alpha).await.unwrap();
let count: i64 = h_alpha
.read()
.interact(|conn| conn.query_row("SELECT COUNT(*) FROM episodes", [], |r| r.get(0)))
.await
.unwrap();
assert_eq!(count, 0, "alpha tenant must be empty");
let count_default: i64 = h_default
.read()
.interact(|conn| conn.query_row("SELECT COUNT(*) FROM episodes", [], |r| r.get(0)))
.await
.unwrap();
assert_eq!(count_default, 1);
});
}
#[test]
fn tenant_registry_two_tenants_have_separate_hnsw_indexes() {
use solo_core::{Confidence, EncodingContext, Episode, MemoryId, Tier};
use chrono::Utc;
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let beta = TenantId::new("beta").unwrap();
let beta_db_name = "beta.db";
let beta_db = tmp.path().join("tenants").join(beta_db_name);
{
let mut conn = crate::init::open_sqlcipher(&beta_db, &key).unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
}
registry
.with_index(|idx| {
idx.register(&beta, beta_db_name, None).unwrap();
})
.await;
let default_id = TenantId::default_tenant();
let h_default = registry.get_or_open(&default_id).await.unwrap();
let ep = Episode {
memory_id: MemoryId::new(),
ts_ms: Utc::now().timestamp_millis(),
source_type: "test".into(),
source_id: None,
content: "vector test".into(),
encoding_context: EncodingContext::default(),
provenance: None,
confidence: Confidence::new(0.9).unwrap(),
strength: 0.5,
salience: 0.5,
tier: Tier::Hot,
};
let emb = stub_embedder().embed("vector test").await.unwrap();
h_default.write().remember(ep, emb).await.unwrap();
let h_beta = registry.get_or_open(&beta).await.unwrap();
assert!(!Arc::ptr_eq(h_default.hnsw(), h_beta.hnsw()));
assert_eq!(h_default.hnsw().len(), 1);
assert_eq!(h_beta.hnsw().len(), 0);
});
}
#[test]
fn tenant_handle_shutdown_idempotency() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
{
let registry = make_registry(tmp.path(), &key, &runtime);
let _ = registry
.get_or_open(&TenantId::default_tenant())
.await
.unwrap();
registry.shutdown_all().await;
}
let registry2 = make_registry(tmp.path(), &key, &runtime);
let listed = registry2.list_active().await.unwrap();
assert_eq!(listed.len(), 1);
});
}
#[test]
fn tenant_registry_open_on_fresh_v071_install_runs_migration() {
use chrono::Utc;
use rusqlite::params;
let tmp = tempfile::TempDir::new().unwrap();
let key = KeyMaterial::derive("v071-test", &[7u8; 16]).unwrap();
let salt_hex = hex::encode([7u8; 16]);
let legacy = tmp.path().join("solo.db");
{
let mut conn = crate::init::open_sqlcipher(&legacy, &key).unwrap();
crate::migration::run_migrations(&mut conn).unwrap();
let now = Utc::now().timestamp_millis();
conn.execute(
"INSERT INTO episodes (
memory_id, ts_ms, source_type, content,
encoding_context_json, confidence, strength, salience,
tier, created_at_ms, updated_at_ms
) VALUES (?, ?, 'user_message', 'pre-upgrade',
'{}', 1.0, 0.5, 0.5, 'hot', ?, ?)",
params!["00000000-0000-0000-0000-000000000001", now, now, now],
)
.unwrap();
}
let cfg_body = format!(
"schema_version = 1\nsalt_hex = \"{salt_hex}\"\n\n[embedder]\nname = \"stub\"\nversion = \"v1\"\ndim = 32\ndtype = \"f32\"\n"
);
std::fs::write(tmp.path().join("solo.config.toml"), cfg_body).unwrap();
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
assert!(tmp.path().join("tenants_index.db").exists());
assert!(tmp.path().join("tenants").join("default.db").exists());
assert!(!tmp.path().join("solo.db").exists());
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.unwrap();
let count: i64 = h
.read()
.interact(|conn| conn.query_row("SELECT COUNT(*) FROM episodes", [], |r| r.get(0)))
.await
.unwrap();
assert_eq!(count, 1, "pre-upgrade episode must survive");
});
}
fn make_registry_with_factory(
data_dir: &std::path::Path,
key: &KeyMaterial,
runtime: &tokio::runtime::Runtime,
factory: Option<Arc<dyn crate::steward_factory::StewardFactory>>,
) -> Arc<TenantRegistry> {
let _ = runtime;
Arc::new(
TenantRegistry::open(TenantRegistryParams {
data_dir: data_dir.to_path_buf(),
key: key.clone(),
embedder: stub_embedder(),
hnsw_params: HnswParams::default(),
steward: None,
runtime_handle: Some(runtime.handle().clone()),
steward_factory: factory,
triples_batch_signal: None,
})
.expect("open registry"),
)
}
fn static_factory_with_stub_steward()
-> Arc<dyn crate::steward_factory::StewardFactory> {
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let client: Arc<dyn solo_core::LlmClient> =
Arc::new(StubLlmClient::default_stub());
let steward = Arc::new(Steward::new(client, StewardConfig::default()));
Arc::new(crate::steward_factory::StaticStewardFactory::new(steward))
}
#[test]
fn tenant_opened_with_static_factory_has_populated_steward_slot() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let factory = static_factory_with_stub_steward();
let registry =
make_registry_with_factory(tmp.path(), &key, &runtime, Some(factory));
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
let slot_guard = h.steward_slot().read().await;
assert!(
slot_guard.is_some(),
"static factory must eagerly populate the slot at open time"
);
});
}
#[test]
fn tenant_opened_with_mcp_sampling_factory_has_empty_steward_slot() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let factory: Arc<dyn crate::steward_factory::StewardFactory> =
Arc::new(crate::steward_factory::McpSamplingStewardFactory::new());
let registry =
make_registry_with_factory(tmp.path(), &key, &runtime, Some(factory));
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
let slot_guard = h.steward_slot().read().await;
assert!(
slot_guard.is_none(),
"MCP-sampling factory's eager-populate is a no-op; slot must be empty until \
SoloMcpServer::initialize writes into it"
);
});
}
#[test]
fn tenant_opened_without_factory_slot_starts_empty_when_steward_is_none() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
let slot_guard = h.steward_slot().read().await;
assert!(
slot_guard.is_none(),
"no factory + no steward → slot is empty (v0.8.x backwards-compat shape)"
);
});
}
#[test]
fn steward_slot_supports_post_open_mutation() {
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let factory: Arc<dyn crate::steward_factory::StewardFactory> =
Arc::new(crate::steward_factory::McpSamplingStewardFactory::new());
let registry =
make_registry_with_factory(tmp.path(), &key, &runtime, Some(factory));
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
assert!(h.steward_slot().read().await.is_none());
let client: Arc<dyn solo_core::LlmClient> =
Arc::new(StubLlmClient::default_stub());
let new_steward =
Arc::new(Steward::new(client, StewardConfig::default()));
{
let mut slot_w = h.steward_slot().write().await;
*slot_w = Some(new_steward.clone());
}
let slot_r = h.steward_slot().read().await;
let observed = slot_r.as_ref().expect("populated after write");
assert!(
Arc::ptr_eq(observed, &new_steward),
"slot read must return the Arc that was written in"
);
});
}
#[test]
fn steward_slot_concurrent_reads_observe_same_value() {
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let factory = static_factory_with_stub_steward();
let registry =
make_registry_with_factory(tmp.path(), &key, &runtime, Some(factory));
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
let baseline_arc = {
let slot_r = h.steward_slot().read().await;
slot_r.as_ref().cloned().expect("populated")
};
let slot = h.steward_slot().clone();
let baseline_for_tasks = baseline_arc.clone();
let tasks: Vec<_> = (0..8)
.map(|_| {
let slot = slot.clone();
let baseline = baseline_for_tasks.clone();
tokio::spawn(async move {
let r = slot.read().await;
let observed = r.as_ref().expect("populated").clone();
Arc::ptr_eq(&observed, &baseline)
})
})
.collect();
for t in tasks {
assert!(t.await.expect("task join"), "torn read detected");
}
});
}
#[test]
fn tenant_opened_without_factory_slot_mirrors_eager_steward_param() {
use solo_steward::test_support::StubLlmClient;
use solo_steward::{Steward, StewardConfig};
let (tmp, key) = fresh_init_dir("alpha-tester");
let runtime = rt();
runtime.block_on(async {
let client: Arc<dyn solo_core::LlmClient> =
Arc::new(StubLlmClient::default_stub());
let eager_steward =
Arc::new(Steward::new(client, StewardConfig::default()));
let registry = Arc::new(
TenantRegistry::open(TenantRegistryParams {
data_dir: tmp.path().to_path_buf(),
key: key.clone(),
embedder: stub_embedder(),
hnsw_params: HnswParams::default(),
steward: Some(eager_steward.clone()),
runtime_handle: Some(runtime.handle().clone()),
steward_factory: None,
triples_batch_signal: None,
})
.expect("open registry"),
);
let h = registry
.get_or_open(&TenantId::default_tenant())
.await
.expect("open");
let slot_r = h.steward_slot().read().await;
let observed = slot_r.as_ref().expect("slot mirrors eager steward");
assert!(
Arc::ptr_eq(observed, &eager_steward),
"slot must mirror the eager steward parameter when no factory is wired"
);
});
}
#[test]
fn get_or_open_first_call_populates_last_accessed() {
let (tmp, key) = fresh_init_dir("la-first-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let id = TenantId::default_tenant();
let pre = registry.list_active().await.unwrap();
assert_eq!(pre.len(), 1);
assert_eq!(
pre[0].last_accessed_ms, None,
"pre-first-open tenant must have last_accessed = NULL"
);
let before_ms = chrono::Utc::now().timestamp_millis();
let _h = registry.get_or_open(&id).await.unwrap();
let after_ms = chrono::Utc::now().timestamp_millis();
let post = registry.list_active().await.unwrap();
let ms = post[0]
.last_accessed_ms
.expect("get_or_open must stamp last_accessed on cache miss");
assert!(
ms >= before_ms && ms <= after_ms,
"last_accessed ms {ms} must fall within [{before_ms}, {after_ms}]"
);
});
}
#[test]
fn get_or_open_cache_hit_updates_last_accessed() {
let (tmp, key) = fresh_init_dir("la-cache-tester");
let runtime = rt();
runtime.block_on(async {
let registry = make_registry(tmp.path(), &key, &runtime);
let id = TenantId::default_tenant();
let _h1 = registry.get_or_open(&id).await.unwrap();
let t0_listed = registry.list_active().await.unwrap();
let t0_stamp = t0_listed[0].last_accessed_ms.expect("first stamp");
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let _h2 = registry.get_or_open(&id).await.unwrap();
let t1_listed = registry.list_active().await.unwrap();
let t1_stamp = t1_listed[0].last_accessed_ms.expect("second stamp");
assert!(
t1_stamp > t0_stamp,
"cache-hit stamp {t1_stamp} must be greater than initial stamp {t0_stamp}"
);
});
}