pub mod m001;
pub mod m002;
pub mod m003;
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use thiserror::Error;
use crate::core::registry::IndexHandle;
pub use m001::M001PerPubConstRust;
pub use m002::M002AbsoluteToRelativePaths;
pub use m003::M003HnswKeyRelativization;
pub const CURRENT_SCHEMA_VERSION: u32 = 3;
pub(crate) const META_TABLE: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new("_meta");
pub(crate) const META_KEY_SCHEMA_VERSION: &str = "schema_version";
#[derive(Debug, Error)]
pub enum MigrationError {
#[error("migration {from}→{to} ({description}) failed: {source}")]
Apply {
from: u32,
to: u32,
description: &'static str,
#[source]
source: anyhow::Error,
},
#[error("schema-version I/O for index '{index_id}': {source}")]
Io {
index_id: String,
#[source]
source: anyhow::Error,
},
}
#[async_trait]
pub trait Migration: Send + Sync {
fn source_version(&self) -> u32;
fn target_version(&self) -> u32;
fn description(&self) -> &'static str;
async fn apply(&self, index: &IndexHandle) -> Result<(), anyhow::Error>;
}
pub struct MigrationRegistry {
migrations: Vec<Arc<dyn Migration>>,
}
impl MigrationRegistry {
pub fn new() -> Self {
let mut migrations: Vec<Arc<dyn Migration>> = vec![
Arc::new(M001PerPubConstRust),
Arc::new(M002AbsoluteToRelativePaths),
Arc::new(M003HnswKeyRelativization),
];
migrations.sort_by_key(|m| m.source_version());
Self { migrations }
}
pub fn current_version(&self) -> u32 {
self.migrations
.iter()
.map(|m| m.target_version())
.max()
.unwrap_or(0)
}
pub fn chain_from(&self, current: u32) -> Vec<Arc<dyn Migration>> {
self.migrations
.iter()
.filter(|m| m.source_version() >= current)
.cloned()
.collect()
}
}
impl Default for MigrationRegistry {
fn default() -> Self {
Self::new()
}
}
pub async fn run_migrations(
index: &IndexHandle,
registry: &MigrationRegistry,
) -> Result<(), MigrationError> {
let index_id = index.id.to_string();
let mut current = index
.read_schema_version()
.await
.map_err(|e| MigrationError::Io {
index_id: index_id.clone(),
source: e,
})?;
let target = registry.current_version();
if current >= target {
tracing::debug!(
index_id = %index_id,
current,
target,
"no migrations needed"
);
return Ok(());
}
tracing::info!(
index_id = %index_id,
current,
target,
"running schema migrations"
);
for migration in registry.chain_from(current) {
tracing::info!(
index_id = %index_id,
from = migration.source_version(),
to = migration.target_version(),
description = migration.description(),
"applying migration"
);
migration
.apply(index)
.await
.map_err(|source| MigrationError::Apply {
from: migration.source_version(),
to: migration.target_version(),
description: migration.description(),
source,
})?;
index
.write_schema_version(migration.target_version())
.await
.map_err(|e| MigrationError::Io {
index_id: index_id.clone(),
source: e,
})?;
current = migration.target_version();
tracing::info!(
index_id = %index_id,
now_at = current,
"migration complete"
);
}
Ok(())
}
impl IndexHandle {
pub async fn read_schema_version(&self) -> anyhow::Result<u32> {
let corpus = {
let indexer = self.indexer.read().await;
indexer.corpus_store()
};
let Some(corpus) = corpus else {
return Ok(0);
};
tokio::task::spawn_blocking(move || corpus.read_schema_version_sync())
.await
.map_err(|e| anyhow::anyhow!("schema_version read task panicked: {e}"))?
}
pub async fn write_schema_version(&self, version: u32) -> anyhow::Result<()> {
let corpus = {
let indexer = self.indexer.read().await;
indexer.corpus_store()
};
let Some(corpus) = corpus else {
return Err(anyhow::anyhow!(
"cannot write schema_version: no durable corpus on this index"
));
};
tokio::task::spawn_blocking(move || corpus.write_schema_version_sync(version))
.await
.map_err(|e| anyhow::anyhow!("schema_version write task panicked: {e}"))?
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
struct MockMigration {
from: u32,
to: u32,
desc: &'static str,
call_count: Arc<AtomicU32>,
}
impl MockMigration {
fn new(from: u32, to: u32, desc: &'static str) -> (Self, Arc<AtomicU32>) {
let counter = Arc::new(AtomicU32::new(0));
let m = Self {
from,
to,
desc,
call_count: Arc::clone(&counter),
};
(m, counter)
}
}
#[async_trait]
impl Migration for MockMigration {
fn source_version(&self) -> u32 {
self.from
}
fn target_version(&self) -> u32 {
self.to
}
fn description(&self) -> &'static str {
self.desc
}
async fn apply(&self, _index: &IndexHandle) -> Result<(), anyhow::Error> {
self.call_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
}
fn registry_from(pairs: Vec<(u32, u32)>) -> MigrationRegistry {
let mut migrations: Vec<Arc<dyn Migration>> = pairs
.into_iter()
.enumerate()
.map(|(i, (from, to))| {
let (m, _) = MockMigration::new(from, to, "mock");
let _ = i;
Arc::new(m) as Arc<dyn Migration>
})
.collect();
migrations.sort_by_key(|m| m.source_version());
MigrationRegistry { migrations }
}
#[test]
fn test_migration_registry_chain_computation() {
let reg = registry_from(vec![(0, 1), (1, 2), (2, 3)]);
assert_eq!(reg.current_version(), 3);
let chain = reg.chain_from(0);
assert_eq!(chain.len(), 3);
assert_eq!(chain[0].source_version(), 0);
assert_eq!(chain[1].source_version(), 1);
assert_eq!(chain[2].source_version(), 2);
let chain = reg.chain_from(1);
assert_eq!(chain.len(), 2);
assert_eq!(chain[0].source_version(), 1);
assert_eq!(chain[1].source_version(), 2);
let chain = reg.chain_from(3);
assert!(chain.is_empty());
}
#[test]
fn test_registry_empty() {
let reg = MigrationRegistry {
migrations: Vec::new(),
};
assert_eq!(reg.current_version(), 0);
assert!(reg.chain_from(0).is_empty());
}
#[test]
fn test_production_registry_current_version_matches_constant() {
let reg = MigrationRegistry::new();
assert_eq!(reg.current_version(), CURRENT_SCHEMA_VERSION);
}
#[tokio::test]
async fn test_run_migrations_no_op_on_empty_registry() {
let empty_reg = MigrationRegistry {
migrations: Vec::new(),
};
let handle = make_test_handle();
let result = run_migrations(&handle, &empty_reg).await;
assert!(result.is_ok(), "empty registry must not error: {result:?}");
}
#[tokio::test]
async fn test_run_migrations_no_corpus_returns_io_err_when_migrations_pending() {
let reg = registry_from(vec![(0, 1)]);
let handle = make_test_handle();
let result = run_migrations(&handle, ®).await;
assert!(
matches!(result, Err(MigrationError::Io { .. })),
"no-corpus write must surface as Io error, got: {result:?}"
);
}
#[tokio::test]
async fn test_schema_version_roundtrip_no_corpus() {
let handle = make_test_handle();
let v = handle.read_schema_version().await.unwrap();
assert_eq!(v, 0, "no-corpus handle must report version 0");
let result = handle.write_schema_version(1).await;
assert!(
result.is_err(),
"write on no-corpus handle must fail: {result:?}"
);
}
fn make_test_handle() -> IndexHandle {
use crate::core::indexer::CodeIndexer;
use crate::core::registry::{IndexHandle, IndexId};
use std::sync::Arc;
use tokio::sync::RwLock;
let indexer = CodeIndexer::new("migration-test", "/tmp/migration-test");
IndexHandle::bare(
IndexId::new("migration-test"),
Arc::new(RwLock::new(indexer)),
std::path::PathBuf::from("/tmp/migration-test"),
)
}
}