use std::path::Path;
use tracing::info;
use crate::error::Result;
use super::migration::migrate_if_needed;
use super::schema::{
CATALOG_FORMAT_VERSION, GHOST_TABLE, KEY_CA_CERT, KEY_CLUSTER_ID, KEY_FORMAT_VERSION,
METADATA_TABLE, ROUTING_TABLE, TOPOLOGY_TABLE, catalog_err,
};
pub struct ClusterCatalog {
pub(super) db: redb::Database,
}
impl ClusterCatalog {
pub fn open(path: &Path) -> Result<Self> {
let db =
redb::Database::create(path).map_err(|e| crate::error::ClusterError::Transport {
detail: format!("open cluster catalog {}: {e}", path.display()),
})?;
let txn = db.begin_write().map_err(catalog_err)?;
{
let _ = txn.open_table(TOPOLOGY_TABLE).map_err(catalog_err)?;
let _ = txn.open_table(ROUTING_TABLE).map_err(catalog_err)?;
let _ = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
let _ = txn.open_table(GHOST_TABLE).map_err(catalog_err)?;
}
txn.commit().map_err(catalog_err)?;
migrate_if_needed(&db)?;
info!(
path = %path.display(),
format_version = CATALOG_FORMAT_VERSION,
"cluster catalog opened"
);
Ok(Self { db })
}
pub fn save_cluster_id(&self, cluster_id: u64) -> Result<()> {
let bytes = cluster_id.to_le_bytes();
let txn = self.db.begin_write().map_err(catalog_err)?;
{
let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
table
.insert(KEY_CLUSTER_ID, bytes.as_slice())
.map_err(catalog_err)?;
}
txn.commit().map_err(catalog_err)?;
Ok(())
}
pub fn load_cluster_id(&self) -> Result<Option<u64>> {
let txn = self.db.begin_read().map_err(catalog_err)?;
let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
match table.get(KEY_CLUSTER_ID).map_err(catalog_err)? {
Some(guard) => {
let bytes = guard.value();
if bytes.len() == 8 {
let mut arr = [0u8; 8];
arr.copy_from_slice(bytes);
let id = u64::from_le_bytes(arr);
Ok(Some(id))
} else {
Ok(None)
}
}
None => Ok(None),
}
}
pub fn is_bootstrapped(&self) -> Result<bool> {
self.load_cluster_id().map(|id| id.is_some())
}
pub fn save_ca_cert(&self, ca_cert_der: &[u8]) -> Result<()> {
let txn = self.db.begin_write().map_err(catalog_err)?;
{
let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
table
.insert(KEY_CA_CERT, ca_cert_der)
.map_err(catalog_err)?;
}
txn.commit().map_err(catalog_err)?;
Ok(())
}
pub fn load_ca_cert(&self) -> Result<Option<Vec<u8>>> {
let txn = self.db.begin_read().map_err(catalog_err)?;
let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
match table.get(KEY_CA_CERT).map_err(catalog_err)? {
Some(guard) => Ok(Some(guard.value().to_vec())),
None => Ok(None),
}
}
}
pub(super) fn read_format_version(db: &redb::Database) -> Result<Option<u32>> {
let txn = db.begin_read().map_err(catalog_err)?;
let table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
match table.get(KEY_FORMAT_VERSION).map_err(catalog_err)? {
Some(guard) => {
let bytes = guard.value();
if bytes.len() != 4 {
return Ok(None);
}
let mut arr = [0u8; 4];
arr.copy_from_slice(bytes);
Ok(Some(u32::from_le_bytes(arr)))
}
None => Ok(None),
}
}
pub(super) fn write_format_version(db: &redb::Database, version: u32) -> Result<()> {
let bytes = version.to_le_bytes();
let txn = db.begin_write().map_err(catalog_err)?;
{
let mut table = txn.open_table(METADATA_TABLE).map_err(catalog_err)?;
table
.insert(KEY_FORMAT_VERSION, bytes.as_slice())
.map_err(catalog_err)?;
}
txn.commit().map_err(catalog_err)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_catalog() -> (tempfile::TempDir, ClusterCatalog) {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cluster.redb");
let catalog = ClusterCatalog::open(&path).unwrap();
(dir, catalog)
}
#[test]
fn cluster_id_persistence() {
let (_dir, catalog) = temp_catalog();
assert!(!catalog.is_bootstrapped().unwrap());
assert_eq!(catalog.load_cluster_id().unwrap(), None);
catalog.save_cluster_id(42).unwrap();
assert!(catalog.is_bootstrapped().unwrap());
assert_eq!(catalog.load_cluster_id().unwrap(), Some(42));
}
#[test]
fn fresh_catalog_is_stamped_with_current_format_version() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cluster.redb");
{
let _ = ClusterCatalog::open(&path).unwrap();
}
let db = redb::Database::create(&path).unwrap();
let version = read_format_version(&db).unwrap();
assert_eq!(version, Some(CATALOG_FORMAT_VERSION));
}
#[test]
fn reopening_current_catalog_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cluster.redb");
let _ = ClusterCatalog::open(&path).unwrap();
let _ = ClusterCatalog::open(&path).unwrap();
let db = redb::Database::create(&path).unwrap();
assert_eq!(
read_format_version(&db).unwrap(),
Some(CATALOG_FORMAT_VERSION)
);
}
#[test]
fn future_format_version_is_refused() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cluster.redb");
{
let _ = ClusterCatalog::open(&path).unwrap();
}
{
let db = redb::Database::create(&path).unwrap();
write_format_version(&db, CATALOG_FORMAT_VERSION + 1).unwrap();
}
match ClusterCatalog::open(&path) {
Ok(_) => panic!("expected downgrade refusal, got Ok"),
Err(e) => {
let msg = e.to_string();
assert!(
msg.contains("newer than supported"),
"expected a clear downgrade refusal, got: {msg}"
);
}
}
}
}