use anyhow::{Context, Result};
use rusqlite::Connection;
use std::fs;
use std::path::{Path, PathBuf};
use tracing::info;
pub struct BioVaultDb {
pub conn: Connection,
}
impl BioVaultDb {
pub fn new() -> Result<Self> {
let db_path = get_biovault_db_path()?;
if needs_migration()? {
migrate_from_messages_db(&db_path)?;
}
let conn = Connection::open(&db_path)
.with_context(|| format!("Failed to open database at {:?}", db_path))?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "busy_timeout", 5000)?;
Self::init_schema(&conn)?;
Ok(Self { conn })
}
fn init_schema(conn: &Connection) -> Result<()> {
let schema = include_str!("../schema.sql");
conn.execute_batch(schema)?;
Self::run_migrations(conn)?;
let current_version = get_schema_version(conn)?;
if current_version.is_none() {
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
["2.0.0"],
)?;
info!("Initialized schema version 2.0.0");
}
Ok(())
}
fn run_migrations(conn: &Connection) -> Result<()> {
let column_exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='data_type'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !column_exists {
info!("Adding data_type column to files table");
conn.execute(
"ALTER TABLE files ADD COLUMN data_type TEXT DEFAULT 'Unknown'",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_files_data_type ON files(data_type)",
[],
)?;
info!("Migration complete: added data_type column and index");
}
let sex_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('participants') WHERE name='inferred_sex'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !sex_exists {
info!("Adding inferred_sex column to participants table");
conn.execute("ALTER TABLE participants ADD COLUMN inferred_sex TEXT", [])?;
info!("Migration complete: added inferred_sex to participants");
}
let status_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='status'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !status_exists {
info!("Adding status column to files table");
conn.execute(
"ALTER TABLE files ADD COLUMN status TEXT DEFAULT 'complete'",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_files_status ON files(status)",
[],
)?;
info!("Migration complete: added status column and index");
}
let error_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='processing_error'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !error_exists {
info!("Adding processing_error column to files table");
conn.execute("ALTER TABLE files ADD COLUMN processing_error TEXT", [])?;
info!("Migration complete: added processing_error column");
}
let queue_added_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='queue_added_at'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !queue_added_exists {
info!("Adding queue_added_at column to files table");
conn.execute("ALTER TABLE files ADD COLUMN queue_added_at DATETIME", [])?;
info!("Migration complete: added queue_added_at column");
}
let source_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='source'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if source_exists {
info!("Migrating source column from files to genotype_metadata");
conn.execute(
"INSERT OR IGNORE INTO genotype_metadata (file_id, source, grch_version)
SELECT id, source, grch_version FROM files WHERE data_type = 'Genotype'",
[],
)?;
conn.execute("ALTER TABLE files DROP COLUMN source", [])?;
info!("Migration complete: moved source to genotype_metadata");
}
let grch_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('files') WHERE name='grch_version'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if grch_exists {
info!("Removing grch_version column from files");
conn.execute("ALTER TABLE files DROP COLUMN grch_version", [])?;
info!("Migration complete: removed grch_version from files");
}
conn.execute(
"CREATE TABLE IF NOT EXISTS genotype_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER UNIQUE NOT NULL,
source TEXT,
grch_version TEXT,
row_count INTEGER,
chromosome_count INTEGER,
inferred_sex TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_genotype_file_id ON genotype_metadata(file_id)",
[],
)?;
let inferred_sex_exists = conn
.query_row(
"SELECT COUNT(*) FROM pragma_table_info('genotype_metadata') WHERE name='inferred_sex'",
[],
|row| row.get(0),
)
.map(|count: i32| count > 0)
.unwrap_or(false);
if !inferred_sex_exists {
info!("Adding inferred_sex column to genotype_metadata table");
conn.execute(
"ALTER TABLE genotype_metadata ADD COLUMN inferred_sex TEXT",
[],
)?;
info!("Migration complete: added inferred_sex column");
}
Ok(())
}
pub fn connection(&self) -> &Connection {
&self.conn
}
}
fn get_biovault_db_path() -> Result<PathBuf> {
Ok(crate::config::get_biovault_home()?.join("biovault.db"))
}
fn get_messages_db_path() -> Result<PathBuf> {
Ok(crate::config::get_biovault_home()?.join("messages.db"))
}
fn needs_migration() -> Result<bool> {
let biovault_db = get_biovault_db_path()?;
let messages_db = get_messages_db_path()?;
Ok(messages_db.exists() && !biovault_db.exists())
}
fn migrate_from_messages_db(target_path: &Path) -> Result<()> {
let messages_db = get_messages_db_path()?;
println!("🔄 Migrating messages.db → biovault.db...");
info!("Starting database migration");
let backup_path = messages_db.with_extension("db.backup");
fs::copy(&messages_db, &backup_path).context("Failed to backup messages.db")?;
println!("✓ Backed up to messages.db.backup");
info!("Backed up messages.db to {:?}", backup_path);
fs::copy(&messages_db, target_path).context("Failed to copy to biovault.db")?;
println!("✓ Copied to biovault.db");
info!("Copied to biovault.db at {:?}", target_path);
let conn = Connection::open(target_path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS schema_version (
version TEXT PRIMARY KEY,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS participants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
participant_id TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
participant_id INTEGER,
file_path TEXT UNIQUE NOT NULL,
file_hash TEXT NOT NULL,
file_type TEXT,
file_size INTEGER,
metadata TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (participant_id) REFERENCES participants(id) ON DELETE SET NULL
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_participant_id ON participants(participant_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_files_participant_id ON files(participant_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_files_file_type ON files(file_type)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
author TEXT NOT NULL,
workflow TEXT NOT NULL,
template TEXT NOT NULL,
project_path TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER NOT NULL,
work_dir TEXT NOT NULL,
participant_count INTEGER NOT NULL,
status TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(id) ON DELETE CASCADE
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_runs_project_id ON runs(project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_runs_status ON runs(status)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS run_participants (
run_id INTEGER NOT NULL,
participant_id INTEGER NOT NULL,
FOREIGN KEY (run_id) REFERENCES runs(id) ON DELETE CASCADE,
FOREIGN KEY (participant_id) REFERENCES participants(id) ON DELETE CASCADE,
PRIMARY KEY (run_id, participant_id)
)",
[],
)?;
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
["2.0.0"],
)?;
println!("✓ Schema upgraded to v2.0.0");
println!("✅ Migration complete!");
info!("Migration complete - schema version 2.0.0");
Ok(())
}
fn get_schema_version(conn: &Connection) -> Result<Option<String>> {
match conn.query_row(
"SELECT version FROM schema_version ORDER BY applied_at DESC LIMIT 1",
[],
|row| row.get(0),
) {
Ok(version) => Ok(Some(version)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(_) => Ok(None), }
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_db_creation() {
let temp = TempDir::new().unwrap();
std::env::set_var("BIOVAULT_HOME", temp.path());
let db = BioVaultDb::new().unwrap();
let version: Option<String> = db
.conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})
.ok();
assert!(version.is_some());
assert_eq!(version.unwrap(), "2.0.0");
}
#[test]
fn test_tables_created() {
let temp = TempDir::new().unwrap();
std::env::set_var("BIOVAULT_HOME", temp.path());
let db = BioVaultDb::new().unwrap();
let tables: Vec<String> = db
.conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert!(tables.contains(&"schema_version".to_string()));
assert!(tables.contains(&"messages".to_string()));
assert!(tables.contains(&"participants".to_string()));
assert!(tables.contains(&"files".to_string()));
assert!(tables.contains(&"projects".to_string()));
assert!(tables.contains(&"runs".to_string()));
assert!(tables.contains(&"run_participants".to_string()));
}
}