pub mod handle;
pub mod migrate;
pub mod registry;
#[cfg(test)]
mod handle_registry_tests;
pub use handle::{TenantHandle, TenantOpenParams};
pub use migrate::migrate_v071_to_v080;
pub use registry::{TenantRegistry, TenantRegistryParams};
use rusqlite::{Connection, OptionalExtension, params};
use solo_core::{Error, Result, TenantId};
use std::path::Path;
use crate::init::open_sqlcipher;
use crate::key_material::KeyMaterial;
use crate::migration;
pub const TENANTS_INDEX_FILENAME: &str = "tenants_index.db";
pub const TENANTS_SUBDIR: &str = "tenants";
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TenantStatus {
Active,
PendingMigration,
PendingDelete,
}
impl TenantStatus {
pub fn as_sql_str(&self) -> &'static str {
match self {
Self::Active => "active",
Self::PendingMigration => "pending_migration",
Self::PendingDelete => "pending_delete",
}
}
fn parse(s: &str) -> Result<Self> {
match s {
"active" => Ok(Self::Active),
"pending_migration" => Ok(Self::PendingMigration),
"pending_delete" => Ok(Self::PendingDelete),
other => Err(Error::storage(format!(
"unknown tenant status from registry: {other:?}"
))),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct TenantRecord {
pub tenant_id: TenantId,
pub db_filename: String,
pub display_name: Option<String>,
pub created_at_ms: i64,
pub status: TenantStatus,
#[serde(default)]
pub quota_bytes: Option<u64>,
#[serde(default)]
pub last_accessed_ms: Option<i64>,
}
pub struct TenantsIndex {
conn: Connection,
}
impl TenantsIndex {
pub fn open(data_dir: &Path, key: &KeyMaterial) -> Result<Self> {
let path = data_dir.join(TENANTS_INDEX_FILENAME);
let mut conn = open_sqlcipher(&path, key)?;
migration::run_tenants_index_migrations(&mut conn)?;
Ok(Self { conn })
}
pub fn register(
&mut self,
tenant_id: &TenantId,
db_filename: &str,
display_name: Option<&str>,
) -> Result<()> {
self.register_with_quota_and_status(
tenant_id,
db_filename,
display_name,
None,
TenantStatus::Active,
)
}
pub fn register_with_status(
&mut self,
tenant_id: &TenantId,
db_filename: &str,
display_name: Option<&str>,
status: TenantStatus,
) -> Result<()> {
self.register_with_quota_and_status(
tenant_id,
db_filename,
display_name,
None,
status,
)
}
pub fn register_with_quota(
&mut self,
tenant_id: &TenantId,
db_filename: &str,
display_name: Option<&str>,
quota_bytes: Option<u64>,
) -> Result<()> {
self.register_with_quota_and_status(
tenant_id,
db_filename,
display_name,
quota_bytes,
TenantStatus::Active,
)
}
fn register_with_quota_and_status(
&mut self,
tenant_id: &TenantId,
db_filename: &str,
display_name: Option<&str>,
quota_bytes: Option<u64>,
status: TenantStatus,
) -> Result<()> {
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let quota_i64: Option<i64> = quota_bytes.map(|q| q.min(i64::MAX as u64) as i64);
let res = self.conn.execute(
"INSERT INTO tenants (tenant_id, db_filename, display_name, created_at_ms, status, quota_bytes)
VALUES (?, ?, ?, ?, ?, ?)",
params![
tenant_id.as_str(),
db_filename,
display_name,
now_ms,
status.as_sql_str(),
quota_i64,
],
);
match res {
Ok(_) => Ok(()),
Err(rusqlite::Error::SqliteFailure(err, msg))
if err.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_PRIMARYKEY
|| err.extended_code == rusqlite::ffi::SQLITE_CONSTRAINT_UNIQUE =>
{
Err(Error::conflict(format!(
"tenant already exists: {} ({})",
tenant_id,
msg.as_deref().unwrap_or("UNIQUE/PK violation")
)))
}
Err(e) => Err(Error::storage(format!("register tenant {tenant_id}: {e}"))),
}
}
pub fn set_status(
&mut self,
tenant_id: &TenantId,
status: TenantStatus,
) -> Result<()> {
self.conn
.execute(
"UPDATE tenants SET status = ? WHERE tenant_id = ?",
params![status.as_sql_str(), tenant_id.as_str()],
)
.map_err(|e| Error::storage(format!("set_status({tenant_id}): {e}")))?;
Ok(())
}
pub fn set_quota(
&mut self,
tenant_id: &TenantId,
quota_bytes: Option<u64>,
) -> Result<usize> {
let quota_i64: Option<i64> = quota_bytes.map(|q| q.min(i64::MAX as u64) as i64);
let updated = self
.conn
.execute(
"UPDATE tenants SET quota_bytes = ? WHERE tenant_id = ?",
params![quota_i64, tenant_id.as_str()],
)
.map_err(|e| Error::storage(format!("set_quota({tenant_id}): {e}")))?;
Ok(updated)
}
pub fn list(&self) -> Result<Vec<TenantRecord>> {
let mut stmt = self
.conn
.prepare(
"SELECT tenant_id, db_filename, display_name, created_at_ms, status, quota_bytes, last_accessed
FROM tenants
ORDER BY created_at_ms ASC, tenant_id ASC",
)
.map_err(|e| Error::storage(format!("prepare list tenants: {e}")))?;
let rows = stmt
.query_map([], row_to_tenant_record)
.map_err(|e| Error::storage(format!("query list tenants: {e}")))?;
let mut out = Vec::new();
for r in rows {
out.push(r.map_err(|e| Error::storage(format!("scan tenant row: {e}")))?);
}
Ok(out)
}
pub fn lookup(&self, tenant_id: &TenantId) -> Result<Option<TenantRecord>> {
self.conn
.query_row(
"SELECT tenant_id, db_filename, display_name, created_at_ms, status, quota_bytes, last_accessed
FROM tenants WHERE tenant_id = ?",
params![tenant_id.as_str()],
row_to_tenant_record,
)
.optional()
.map_err(|e| Error::storage(format!("lookup tenant {tenant_id}: {e}")))
}
pub fn touch_last_accessed(
&mut self,
tenant_id: &TenantId,
now_ms: i64,
) -> Result<usize> {
let updated = self
.conn
.execute(
"UPDATE tenants SET last_accessed = ? WHERE tenant_id = ?",
params![now_ms, tenant_id.as_str()],
)
.map_err(|e| {
Error::storage(format!(
"touch_last_accessed({tenant_id}): {e}"
))
})?;
Ok(updated)
}
pub fn remove(&mut self, tenant_id: &TenantId) -> Result<()> {
self.conn
.execute(
"DELETE FROM tenants WHERE tenant_id = ?",
params![tenant_id.as_str()],
)
.map_err(|e| Error::storage(format!("remove tenant {tenant_id}: {e}")))?;
Ok(())
}
pub(crate) fn connection(&self) -> &Connection {
&self.conn
}
#[cfg(any(test, feature = "test-support"))]
pub fn from_connection_for_tests(conn: Connection) -> Self {
Self { conn }
}
}
fn row_to_tenant_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<TenantRecord> {
let tenant_id_str: String = row.get(0)?;
let tenant_id = TenantId::new(tenant_id_str.clone()).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
0,
rusqlite::types::Type::Text,
Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid tenant_id in registry: {tenant_id_str}: {e}"),
)),
)
})?;
let db_filename: String = row.get(1)?;
let display_name: Option<String> = row.get(2)?;
let created_at_ms: i64 = row.get(3)?;
let status_str: String = row.get(4)?;
let status = TenantStatus::parse(&status_str).map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(
4,
rusqlite::types::Type::Text,
Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("{e}"),
)),
)
})?;
let quota_bytes: Option<u64> = row
.get::<_, Option<i64>>(5)?
.and_then(|v| if v < 0 { None } else { Some(v as u64) });
let last_accessed_ms: Option<i64> = row.get::<_, Option<i64>>(6)?;
Ok(TenantRecord {
tenant_id,
db_filename,
display_name,
created_at_ms,
status,
quota_bytes,
last_accessed_ms,
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn fast_test_key() -> KeyMaterial {
let salt = [7u8; 16];
KeyMaterial::derive("registry-test-passphrase", &salt)
.expect("derive test key")
}
fn open_fresh(tmp: &TempDir) -> TenantsIndex {
let key = fast_test_key();
TenantsIndex::open(tmp.path(), &key).expect("open tenants_index")
}
#[test]
fn open_creates_schema() {
let tmp = TempDir::new().unwrap();
let idx = open_fresh(&tmp);
let n: i64 = idx
.conn
.query_row("SELECT COUNT(*) FROM tenants", [], |r| r.get(0))
.unwrap();
assert_eq!(n, 0);
let v: u32 = idx
.conn
.query_row(
"SELECT MAX(version) FROM schema_migrations_tenants_index",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(v, 9);
}
#[test]
fn register_then_lookup() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("acme").unwrap();
idx.register(&t, "acme.db", Some("ACME Corp")).unwrap();
let rec = idx.lookup(&t).unwrap().expect("tenant must be present");
assert_eq!(rec.tenant_id, t);
assert_eq!(rec.db_filename, "acme.db");
assert_eq!(rec.display_name.as_deref(), Some("ACME Corp"));
assert_eq!(rec.status, TenantStatus::Active);
}
#[test]
fn register_duplicate_errors() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("dup").unwrap();
idx.register(&t, "dup.db", None).unwrap();
let err = idx
.register(&t, "dup-other.db", None)
.expect_err("duplicate tenant_id must error");
assert!(
matches!(err, Error::Conflict(_)),
"expected Conflict, got {err:?}"
);
}
#[test]
fn list_returns_in_creation_order() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let ids = ["alpha", "beta", "gamma"];
for id in ids {
idx.register(
&TenantId::new(id).unwrap(),
&format!("{id}.db"),
None,
)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(2));
}
let listed = idx.list().unwrap();
assert_eq!(listed.len(), 3);
let listed_ids: Vec<&str> =
listed.iter().map(|r| r.tenant_id.as_str()).collect();
assert_eq!(listed_ids, ["alpha", "beta", "gamma"]);
}
#[test]
fn set_status_persists() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("statushost").unwrap();
idx.register(&t, "statushost.db", None).unwrap();
idx.set_status(&t, TenantStatus::PendingMigration).unwrap();
let rec = idx.lookup(&t).unwrap().unwrap();
assert_eq!(rec.status, TenantStatus::PendingMigration);
idx.set_status(&t, TenantStatus::PendingDelete).unwrap();
let rec = idx.lookup(&t).unwrap().unwrap();
assert_eq!(rec.status, TenantStatus::PendingDelete);
}
#[test]
fn remove_idempotent_on_missing() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let absent = TenantId::new("ghost").unwrap();
idx.remove(&absent).expect("idempotent remove");
assert_eq!(idx.list().unwrap().len(), 0);
}
#[test]
fn register_default_persists_no_quota() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("noquota").unwrap();
idx.register(&t, "noquota.db", None).unwrap();
let rec = idx.lookup(&t).unwrap().expect("registered");
assert_eq!(rec.quota_bytes, None);
}
#[test]
fn register_with_quota_persists_value() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("limited").unwrap();
idx.register_with_quota(&t, "limited.db", None, Some(1_048_576))
.unwrap();
let rec = idx.lookup(&t).unwrap().expect("registered");
assert_eq!(rec.quota_bytes, Some(1_048_576));
}
#[test]
fn set_quota_updates_and_clears() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("quotachange").unwrap();
idx.register_with_quota(&t, "qc.db", None, Some(1_000_000)).unwrap();
let n = idx.set_quota(&t, Some(5_000_000)).unwrap();
assert_eq!(n, 1);
let rec = idx.lookup(&t).unwrap().unwrap();
assert_eq!(rec.quota_bytes, Some(5_000_000));
let n2 = idx.set_quota(&t, None).unwrap();
assert_eq!(n2, 1);
let rec2 = idx.lookup(&t).unwrap().unwrap();
assert_eq!(rec2.quota_bytes, None);
}
#[test]
fn set_quota_on_missing_tenant_returns_zero_rows() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let n = idx
.set_quota(&TenantId::new("ghost").unwrap(), Some(123))
.unwrap();
assert_eq!(n, 0);
}
#[test]
fn list_surfaces_quota_bytes() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("listed").unwrap();
idx.register_with_quota(&t, "listed.db", None, Some(42))
.unwrap();
let listed = idx.list().unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].quota_bytes, Some(42));
}
#[test]
fn register_leaves_last_accessed_null_until_first_touch() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("fresh").unwrap();
idx.register(&t, "fresh.db", None).unwrap();
let rec = idx.lookup(&t).unwrap().expect("registered");
assert_eq!(rec.last_accessed_ms, None);
}
#[test]
fn touch_last_accessed_persists_ms() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("touched").unwrap();
idx.register(&t, "touched.db", None).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
let n = idx.touch_last_accessed(&t, now_ms).unwrap();
assert_eq!(n, 1, "expected exactly one row updated");
let rec = idx.lookup(&t).unwrap().unwrap();
assert_eq!(rec.last_accessed_ms, Some(now_ms));
}
#[test]
fn touch_last_accessed_overwrites_prior_stamp() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("rebumped").unwrap();
idx.register(&t, "rebumped.db", None).unwrap();
let first_ms: i64 = 1_000_000_000_000;
idx.touch_last_accessed(&t, first_ms).unwrap();
let second_ms: i64 = 2_000_000_000_000;
idx.touch_last_accessed(&t, second_ms).unwrap();
let rec = idx.lookup(&t).unwrap().unwrap();
assert_eq!(
rec.last_accessed_ms,
Some(second_ms),
"second touch must overwrite first"
);
}
#[test]
fn touch_last_accessed_on_missing_tenant_returns_zero_rows() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let n = idx
.touch_last_accessed(&TenantId::new("ghost").unwrap(), 123)
.unwrap();
assert_eq!(n, 0, "missing tenant must not error and must report 0 rows");
}
#[test]
fn list_surfaces_last_accessed_ms() {
let tmp = TempDir::new().unwrap();
let mut idx = open_fresh(&tmp);
let t = TenantId::new("listed-stamp").unwrap();
idx.register(&t, "ls.db", None).unwrap();
let now_ms: i64 = chrono::Utc::now().timestamp_millis();
idx.touch_last_accessed(&t, now_ms).unwrap();
let listed = idx.list().unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].last_accessed_ms, Some(now_ms));
}
}