use parking_lot::Mutex;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use yantrikdb::YantrikDB;
use crate::commit::{ApplyError, EngineResolver, TenantId};
use crate::config::ServerConfig;
use crate::control::{ControlDb, DatabaseRecord};
use crate::embedder::FastEmbedder;
pub struct TenantPool {
engines: Mutex<HashMap<i64, Arc<YantrikDB>>>,
data_dir: PathBuf,
embedding_dim: usize,
embedder: Option<FastEmbedder>,
master_key: Option<[u8; 32]>,
}
impl TenantPool {
pub fn new(
config: &ServerConfig,
embedder: Option<FastEmbedder>,
master_key: Option<[u8; 32]>,
) -> Self {
Self {
engines: Mutex::new(HashMap::new()),
data_dir: config.server.data_dir.clone(),
embedding_dim: config.embedding.dim,
embedder,
master_key,
}
}
#[allow(dead_code)]
pub fn is_encrypted(&self) -> bool {
self.master_key.is_some()
}
pub fn get_engine(&self, db_record: &DatabaseRecord) -> anyhow::Result<Arc<YantrikDB>> {
let mut engines = self.engines.lock();
if let Some(engine) = engines.get(&db_record.id) {
return Ok(Arc::clone(engine));
}
let db_dir = self.data_dir.join(&db_record.path);
std::fs::create_dir_all(&db_dir)?;
let db_path = db_dir.join("yantrik.db");
let mut engine = if let Some(ref key) = self.master_key {
YantrikDB::new_encrypted(
db_path.to_str().unwrap_or("yantrik.db"),
self.embedding_dim,
key,
)?
} else {
YantrikDB::new(db_path.to_str().unwrap_or("yantrik.db"), self.embedding_dim)?
};
if let Some(ref emb) = self.embedder {
engine.set_embedder(emb.boxed());
}
let engine = Arc::new(engine);
engines.insert(db_record.id, Arc::clone(&engine));
tracing::info!(db_name = %db_record.name, db_id = db_record.id, "loaded engine");
Ok(engine)
}
#[allow(dead_code)]
pub fn evict(&self, db_id: i64) {
let mut engines = self.engines.lock();
engines.remove(&db_id);
}
pub fn loaded_count(&self) -> usize {
self.engines.lock().len()
}
pub fn embedder(&self) -> Option<&FastEmbedder> {
self.embedder.as_ref()
}
pub fn data_dir(&self) -> &Path {
&self.data_dir
}
}
pub struct TenantPoolEngineResolver {
pool: Arc<TenantPool>,
control: Arc<Mutex<ControlDb>>,
}
impl TenantPoolEngineResolver {
pub fn new(pool: Arc<TenantPool>, control: Arc<Mutex<ControlDb>>) -> Self {
Self { pool, control }
}
}
impl EngineResolver for TenantPoolEngineResolver {
fn resolve(&self, tenant_id: TenantId) -> Result<Arc<YantrikDB>, ApplyError> {
let id = tenant_id.0;
let db_record = {
let control = self.control.lock();
control
.get_database_by_id(id)
.map_err(|e| ApplyError::EngineFailure {
message: format!("control DB lookup tenant_id={id}: {e}"),
})?
.ok_or_else(|| ApplyError::EngineFailure {
message: format!(
"tenant_id={id} not found in control DB — followers must \
replicate the database row before applying mutations against \
it (RFC 010 PR-6 control-plane replication contract)"
),
})?
};
self.pool
.get_engine(&db_record)
.map_err(|e| ApplyError::EngineFailure {
message: format!("tenant_pool.get_engine(tenant_id={id}): {e}"),
})
}
}
pub fn ensure_default_database(
control: &ControlDb,
data_dir: &Path,
) -> anyhow::Result<DatabaseRecord> {
if let Some(db) = control.get_database("default")? {
return Ok(db);
}
let path = "default";
let db_dir = data_dir.join(path);
std::fs::create_dir_all(&db_dir)?;
let id = control.create_database("default", path)?;
Ok(DatabaseRecord {
id,
name: "default".into(),
path: path.into(),
created_at: String::new(),
})
}