use std::path::Path;
use anyhow::{Context, Result};
use redb::{ReadableDatabase, ReadableTableMetadata as _};
use redb2::ReadableTable as _;
#[derive(Clone, Copy)]
enum Shape {
StrBytes,
U64Bytes,
StrU64,
}
const CORPUS_TABLES: &[(&str, Shape)] = &[
("chunks", Shape::StrBytes),
("entities", Shape::StrBytes),
("kg_nodes", Shape::StrBytes),
("kg_edges", Shape::StrBytes),
("kg_edges_rev", Shape::StrBytes),
("file_hashes", Shape::StrBytes),
("kg_communities", Shape::U64Bytes),
("kg_symbol_community", Shape::StrU64),
("_meta", Shape::StrBytes),
];
pub(super) type CopySummary = (Vec<(&'static str, u64)>, u64, u32);
pub(super) fn copy_all_tables(source: &Path, staging: &Path) -> Result<CopySummary> {
let mut per_table: Vec<(&'static str, u64)> = Vec::with_capacity(CORPUS_TABLES.len());
let mut total_rows = 0u64;
{
let src = redb2::Database::open(source)
.with_context(|| format!("open redb 2.x source {}", source.display()))?;
let dst = redb::Database::create(staging)
.with_context(|| format!("create staging redb 4.x corpus {}", staging.display()))?;
let read = src
.begin_read()
.context("begin redb 2.x read transaction")?;
let write = dst
.begin_write()
.context("begin redb 4.x write transaction")?;
for (name, shape) in CORPUS_TABLES {
let copied = match shape {
Shape::StrBytes => copy_str_bytes(&read, &write, name)?,
Shape::U64Bytes => copy_u64_bytes(&read, &write, name)?,
Shape::StrU64 => copy_str_u64(&read, &write, name)?,
};
if copied > 0 {
tracing::debug!(table = name, rows = copied, "copied redb table");
}
per_table.push((name, copied));
total_rows += copied;
}
write
.commit()
.context("commit redb 4.x write transaction")?;
}
verify_counts(staging, &per_table)
.context("verify migrated staging corpus row counts match the source")?;
let schema_version = read_schema_version(staging).unwrap_or(0);
Ok((per_table, total_rows, schema_version))
}
fn copy_str_bytes(
read: &redb2::ReadTransaction,
write: &redb::WriteTransaction,
name: &str,
) -> Result<u64> {
let def2: redb2::TableDefinition<&str, &[u8]> = redb2::TableDefinition::new(name);
let src = match read.open_table(def2) {
Ok(t) => t,
Err(redb2::TableError::TableDoesNotExist(_)) => return Ok(0),
Err(e) => return Err(anyhow::anyhow!("open 2.x table '{name}': {e}")),
};
let def4: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(name);
let mut dst = write
.open_table(def4)
.with_context(|| format!("open 4.x table '{name}'"))?;
let mut n = 0u64;
for entry in src.iter().with_context(|| format!("iterate '{name}'"))? {
let (k, v) = entry.with_context(|| format!("read row in '{name}'"))?;
dst.insert(k.value(), v.value())
.with_context(|| format!("insert row into '{name}'"))?;
n += 1;
}
Ok(n)
}
fn copy_u64_bytes(
read: &redb2::ReadTransaction,
write: &redb::WriteTransaction,
name: &str,
) -> Result<u64> {
let def2: redb2::TableDefinition<u64, &[u8]> = redb2::TableDefinition::new(name);
let src = match read.open_table(def2) {
Ok(t) => t,
Err(redb2::TableError::TableDoesNotExist(_)) => return Ok(0),
Err(e) => return Err(anyhow::anyhow!("open 2.x table '{name}': {e}")),
};
let def4: redb::TableDefinition<u64, &[u8]> = redb::TableDefinition::new(name);
let mut dst = write
.open_table(def4)
.with_context(|| format!("open 4.x table '{name}'"))?;
let mut n = 0u64;
for entry in src.iter().with_context(|| format!("iterate '{name}'"))? {
let (k, v) = entry.with_context(|| format!("read row in '{name}'"))?;
dst.insert(k.value(), v.value())
.with_context(|| format!("insert row into '{name}'"))?;
n += 1;
}
Ok(n)
}
fn copy_str_u64(
read: &redb2::ReadTransaction,
write: &redb::WriteTransaction,
name: &str,
) -> Result<u64> {
let def2: redb2::TableDefinition<&str, u64> = redb2::TableDefinition::new(name);
let src = match read.open_table(def2) {
Ok(t) => t,
Err(redb2::TableError::TableDoesNotExist(_)) => return Ok(0),
Err(e) => return Err(anyhow::anyhow!("open 2.x table '{name}': {e}")),
};
let def4: redb::TableDefinition<&str, u64> = redb::TableDefinition::new(name);
let mut dst = write
.open_table(def4)
.with_context(|| format!("open 4.x table '{name}'"))?;
let mut n = 0u64;
for entry in src.iter().with_context(|| format!("iterate '{name}'"))? {
let (k, v) = entry.with_context(|| format!("read row in '{name}'"))?;
dst.insert(k.value(), v.value())
.with_context(|| format!("insert row into '{name}'"))?;
n += 1;
}
Ok(n)
}
fn verify_counts(staging: &Path, per_table: &[(&'static str, u64)]) -> Result<()> {
let db = redb::Database::open(staging).with_context(|| {
format!(
"re-open staging corpus {} for verification",
staging.display()
)
})?;
let read = db.begin_read().context("begin verify read txn")?;
for (name, expected) in per_table {
if *expected == 0 {
continue;
}
let actual = verify_table_len(&read, name, expected)?;
if actual != *expected {
anyhow::bail!(
"row-count mismatch after migration: table '{name}' has {actual} rows in the \
migrated corpus but {expected} were copied from the source"
);
}
}
Ok(())
}
fn verify_table_len(read: &redb::ReadTransaction, name: &str, expected: &u64) -> Result<u64> {
if let Ok(t) = read.open_table::<&str, &[u8]>(redb::TableDefinition::new(name)) {
return t.len().with_context(|| format!("len of '{name}'"));
}
if let Ok(t) = read.open_table::<u64, &[u8]>(redb::TableDefinition::new(name)) {
return t.len().with_context(|| format!("len of '{name}'"));
}
if let Ok(t) = read.open_table::<&str, u64>(redb::TableDefinition::new(name)) {
return t.len().with_context(|| format!("len of '{name}'"));
}
anyhow::bail!(
"verification could not open migrated table '{name}' (expected {expected} rows) under any \
known shape"
)
}
fn read_schema_version(staging: &Path) -> Option<u32> {
let db = redb::Database::open(staging).ok()?;
let read = db.begin_read().ok()?;
let table = read
.open_table::<&str, &[u8]>(redb::TableDefinition::new("_meta"))
.ok()?;
let v = table.get("schema_version").ok()??;
let bytes = v.value();
if bytes.len() == 4 {
Some(u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]))
} else {
None
}
}