use rusqlite::Connection;
use crate::error::StorageResult;
pub const SCHEMA_VERSION: i32 = 7;
pub fn initialize_schema(conn: &Connection) -> StorageResult<()> {
let current_version = get_schema_version(conn)?;
if current_version == 0 {
create_schema_v1(conn)?;
set_schema_version(conn, 1)?;
migrate_schema(conn, 1)?;
} else if current_version < SCHEMA_VERSION {
migrate_schema(conn, current_version)?;
}
Ok(())
}
fn get_schema_version(conn: &Connection) -> StorageResult<i32> {
conn.execute(
"CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create schema_version table: {}", e),
source: None,
})
})?;
let version: Option<i32> = conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})
.ok();
Ok(version.unwrap_or(0))
}
fn set_schema_version(conn: &Connection, version: i32) -> StorageResult<()> {
conn.execute("DELETE FROM schema_version", [])
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to clear schema_version: {}", e),
source: None,
})
})?;
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
[version],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to set schema_version: {}", e),
source: None,
})
})?;
Ok(())
}
fn create_schema_v1(conn: &Connection) -> StorageResult<()> {
conn.execute(
"CREATE TABLE IF NOT EXISTS resources (
tenant_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
id TEXT NOT NULL,
version_id TEXT NOT NULL,
data BLOB NOT NULL,
last_updated TEXT NOT NULL,
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TEXT,
PRIMARY KEY (tenant_id, resource_type, id)
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create resources table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS resource_history (
tenant_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
id TEXT NOT NULL,
version_id TEXT NOT NULL,
data BLOB NOT NULL,
last_updated TEXT NOT NULL,
is_deleted INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (tenant_id, resource_type, id, version_id)
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create resource_history table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS search_index (
tenant_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
param_name TEXT NOT NULL,
param_url TEXT,
value_string TEXT,
value_token_system TEXT,
value_token_code TEXT,
value_token_display TEXT,
value_date TEXT,
value_date_precision TEXT,
value_number REAL,
value_quantity_value REAL,
value_quantity_unit TEXT,
value_quantity_system TEXT,
value_reference TEXT,
value_uri TEXT,
composite_group INTEGER,
value_identifier_type_system TEXT,
value_identifier_type_code TEXT,
FOREIGN KEY (tenant_id, resource_type, resource_id)
REFERENCES resources(tenant_id, resource_type, id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create search_index table: {}", e),
source: None,
})
})?;
create_indexes(conn)?;
create_fts_table(conn)?;
Ok(())
}
fn create_indexes(conn: &Connection) -> StorageResult<()> {
let indexes = [
"CREATE INDEX IF NOT EXISTS idx_resources_type ON resources(tenant_id, resource_type)",
"CREATE INDEX IF NOT EXISTS idx_resources_updated ON resources(tenant_id, last_updated)",
"CREATE INDEX IF NOT EXISTS idx_history_resource ON resource_history(tenant_id, resource_type, id)",
"CREATE INDEX IF NOT EXISTS idx_history_updated ON resource_history(tenant_id, last_updated)",
"CREATE INDEX IF NOT EXISTS idx_search_string ON search_index(tenant_id, resource_type, param_name, value_string)",
"CREATE INDEX IF NOT EXISTS idx_search_token ON search_index(tenant_id, resource_type, param_name, value_token_system, value_token_code)",
"CREATE INDEX IF NOT EXISTS idx_search_date ON search_index(tenant_id, resource_type, param_name, value_date)",
"CREATE INDEX IF NOT EXISTS idx_search_number ON search_index(tenant_id, resource_type, param_name, value_number)",
"CREATE INDEX IF NOT EXISTS idx_search_quantity ON search_index(tenant_id, resource_type, param_name, value_quantity_value, value_quantity_unit)",
"CREATE INDEX IF NOT EXISTS idx_search_reference ON search_index(tenant_id, resource_type, param_name, value_reference)",
"CREATE INDEX IF NOT EXISTS idx_search_uri ON search_index(tenant_id, resource_type, param_name, value_uri)",
"CREATE INDEX IF NOT EXISTS idx_search_composite ON search_index(tenant_id, resource_type, resource_id, param_name, composite_group)",
"CREATE INDEX IF NOT EXISTS idx_search_resource ON search_index(tenant_id, resource_type, resource_id)",
"CREATE INDEX IF NOT EXISTS idx_search_token_display ON search_index(tenant_id, resource_type, param_name, value_token_display)",
"CREATE INDEX IF NOT EXISTS idx_search_identifier_type ON search_index(tenant_id, resource_type, param_name, value_identifier_type_system, value_identifier_type_code)",
];
for index_sql in &indexes {
conn.execute(index_sql, []).map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create index: {}", e),
source: None,
})
})?;
}
Ok(())
}
fn create_fts_table(conn: &Connection) -> StorageResult<()> {
let fts5_available: i32 = conn
.query_row(
"SELECT sqlite_compileoption_used('ENABLE_FTS5')",
[],
|row| row.get(0),
)
.unwrap_or(0);
if fts5_available == 0 {
return Ok(());
}
conn.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS resource_fts USING fts5(
resource_id UNINDEXED,
resource_type UNINDEXED,
tenant_id UNINDEXED,
narrative_text,
full_content,
tokenize='porter unicode61 remove_diacritics 1'
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create resource_fts table: {}", e),
source: None,
})
})?;
Ok(())
}
fn migrate_schema(conn: &Connection, from_version: i32) -> StorageResult<()> {
let mut version = from_version;
while version < SCHEMA_VERSION {
match version {
1 => migrate_v1_to_v2(conn)?,
2 => migrate_v2_to_v3(conn)?,
3 => migrate_v3_to_v4(conn)?,
4 => migrate_v4_to_v5(conn)?,
5 => migrate_v5_to_v6(conn)?,
6 => migrate_v6_to_v7(conn)?,
_ => {
return Err(crate::error::StorageError::Backend(
crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Unknown schema version: {}", version),
source: None,
},
));
}
}
version += 1;
set_schema_version(conn, version)?;
}
Ok(())
}
fn migrate_v1_to_v2(conn: &Connection) -> StorageResult<()> {
let migrations = [
"ALTER TABLE search_index ADD COLUMN param_url TEXT",
"ALTER TABLE search_index ADD COLUMN value_date_precision TEXT",
"ALTER TABLE search_index ADD COLUMN value_quantity_system TEXT",
"ALTER TABLE search_index ADD COLUMN composite_group INTEGER",
];
for sql in &migrations {
let _ = conn.execute(sql, []);
}
let indexes = [
"CREATE INDEX IF NOT EXISTS idx_search_quantity ON search_index(tenant_id, resource_type, param_name, value_quantity_value, value_quantity_unit)",
"CREATE INDEX IF NOT EXISTS idx_search_composite ON search_index(tenant_id, resource_type, resource_id, param_name, composite_group)",
"CREATE INDEX IF NOT EXISTS idx_search_resource ON search_index(tenant_id, resource_type, resource_id)",
];
for index_sql in &indexes {
conn.execute(index_sql, []).map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create index in migration: {}", e),
source: None,
})
})?;
}
Ok(())
}
fn migrate_v2_to_v3(conn: &Connection) -> StorageResult<()> {
let fts5_available: i32 = conn
.query_row(
"SELECT sqlite_compileoption_used('ENABLE_FTS5')",
[],
|row| row.get(0),
)
.unwrap_or(0);
if fts5_available == 0 {
tracing::warn!("FTS5 not available - full-text search features will be disabled");
return Ok(());
}
conn.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS resource_fts USING fts5(
resource_id UNINDEXED,
resource_type UNINDEXED,
tenant_id UNINDEXED,
narrative_text,
full_content,
tokenize='porter unicode61 remove_diacritics 1'
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create resource_fts table: {}", e),
source: None,
})
})?;
Ok(())
}
fn migrate_v3_to_v4(conn: &Connection) -> StorageResult<()> {
let migrations = [
"ALTER TABLE search_index ADD COLUMN value_token_display TEXT",
"ALTER TABLE search_index ADD COLUMN value_identifier_type_system TEXT",
"ALTER TABLE search_index ADD COLUMN value_identifier_type_code TEXT",
];
for sql in &migrations {
let _ = conn.execute(sql, []);
}
let indexes = [
"CREATE INDEX IF NOT EXISTS idx_search_token_display ON search_index(tenant_id, resource_type, param_name, value_token_display)",
"CREATE INDEX IF NOT EXISTS idx_search_identifier_type ON search_index(tenant_id, resource_type, param_name, value_identifier_type_system, value_identifier_type_code)",
];
for index_sql in &indexes {
conn.execute(index_sql, []).map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create index in migration: {}", e),
source: None,
})
})?;
}
Ok(())
}
fn migrate_v4_to_v5(conn: &Connection) -> StorageResult<()> {
let fts5_available: i32 = conn
.query_row(
"SELECT sqlite_compileoption_used('ENABLE_FTS5')",
[],
|row| row.get(0),
)
.unwrap_or(0);
if fts5_available == 0 {
tracing::warn!("FTS5 not available - :text-advanced modifier will not work");
return Ok(());
}
let fts_exists: i32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='search_index_fts'",
[],
|row| row.get(0),
)
.unwrap_or(0);
if fts_exists == 0 {
conn.execute(
r#"
CREATE VIRTUAL TABLE IF NOT EXISTS search_index_fts USING fts5(
text_content,
content='search_index',
content_rowid='rowid',
tokenize='porter unicode61'
)
"#,
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create search_index_fts table: {}", e),
source: None,
})
})?;
}
let _ = conn.execute("DROP TRIGGER IF EXISTS search_index_fts_insert", []);
let _ = conn.execute("DROP TRIGGER IF EXISTS search_index_fts_delete", []);
let _ = conn.execute("DROP TRIGGER IF EXISTS search_index_fts_update", []);
conn.execute(
r#"
CREATE TRIGGER search_index_fts_insert AFTER INSERT ON search_index
WHEN new.value_string IS NOT NULL OR new.value_token_display IS NOT NULL
BEGIN
INSERT INTO search_index_fts(rowid, text_content)
VALUES (new.rowid, COALESCE(new.value_string, '') || ' ' || COALESCE(new.value_token_display, ''));
END
"#,
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create FTS insert trigger: {}", e),
source: None,
})
})?;
conn.execute(
r#"
CREATE TRIGGER search_index_fts_delete AFTER DELETE ON search_index
WHEN old.value_string IS NOT NULL OR old.value_token_display IS NOT NULL
BEGIN
INSERT INTO search_index_fts(search_index_fts, rowid, text_content)
VALUES ('delete', old.rowid, COALESCE(old.value_string, '') || ' ' || COALESCE(old.value_token_display, ''));
END
"#,
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create FTS delete trigger: {}", e),
source: None,
})
})?;
conn.execute(
r#"
CREATE TRIGGER search_index_fts_update AFTER UPDATE ON search_index
WHEN old.value_string IS NOT NULL OR new.value_string IS NOT NULL
OR old.value_token_display IS NOT NULL OR new.value_token_display IS NOT NULL
BEGIN
INSERT INTO search_index_fts(search_index_fts, rowid, text_content)
VALUES ('delete', old.rowid, COALESCE(old.value_string, '') || ' ' || COALESCE(old.value_token_display, ''));
INSERT INTO search_index_fts(rowid, text_content)
VALUES (new.rowid, COALESCE(new.value_string, '') || ' ' || COALESCE(new.value_token_display, ''));
END
"#,
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create FTS update trigger: {}", e),
source: None,
})
})?;
let _ = conn.execute(
"INSERT INTO search_index_fts(search_index_fts) VALUES ('rebuild')",
[],
);
Ok(())
}
fn migrate_v5_to_v6(conn: &Connection) -> StorageResult<()> {
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_export_jobs (
id TEXT PRIMARY KEY,
tenant_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'accepted',
level TEXT NOT NULL,
group_id TEXT,
request_json TEXT NOT NULL,
transaction_time TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
error_message TEXT,
current_type TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_export_jobs table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_export_jobs_tenant
ON bulk_export_jobs(tenant_id, status)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create idx_export_jobs_tenant: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_export_progress (
job_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
total_count INTEGER,
exported_count INTEGER DEFAULT 0,
error_count INTEGER DEFAULT 0,
cursor_state TEXT,
PRIMARY KEY (job_id, resource_type),
FOREIGN KEY (job_id) REFERENCES bulk_export_jobs(id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_export_progress table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_export_files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
file_type TEXT NOT NULL DEFAULT 'output',
file_path TEXT NOT NULL,
resource_count INTEGER DEFAULT 0,
byte_count INTEGER DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
FOREIGN KEY (job_id) REFERENCES bulk_export_jobs(id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_export_files table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_export_files_job
ON bulk_export_files(job_id)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create idx_export_files_job: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_submissions (
tenant_id TEXT NOT NULL,
submitter TEXT NOT NULL,
submission_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'in-progress',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT,
metadata BLOB,
PRIMARY KEY (tenant_id, submitter, submission_id)
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_submissions table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bulk_submissions_status
ON bulk_submissions(tenant_id, status)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create idx_bulk_submissions_status: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_manifests (
tenant_id TEXT NOT NULL,
submitter TEXT NOT NULL,
submission_id TEXT NOT NULL,
manifest_id TEXT NOT NULL,
manifest_url TEXT,
replaces_manifest_url TEXT,
status TEXT NOT NULL DEFAULT 'pending',
added_at TEXT NOT NULL,
total_entries INTEGER DEFAULT 0,
processed_entries INTEGER DEFAULT 0,
failed_entries INTEGER DEFAULT 0,
PRIMARY KEY (tenant_id, submitter, submission_id, manifest_id),
FOREIGN KEY (tenant_id, submitter, submission_id)
REFERENCES bulk_submissions(tenant_id, submitter, submission_id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_manifests table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_entry_results (
tenant_id TEXT NOT NULL,
submitter TEXT NOT NULL,
submission_id TEXT NOT NULL,
manifest_id TEXT NOT NULL,
line_number INTEGER NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT,
created INTEGER,
outcome TEXT NOT NULL,
operation_outcome BLOB,
PRIMARY KEY (tenant_id, submitter, submission_id, manifest_id, line_number),
FOREIGN KEY (tenant_id, submitter, submission_id, manifest_id)
REFERENCES bulk_manifests(tenant_id, submitter, submission_id, manifest_id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_entry_results table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bulk_entry_results_outcome
ON bulk_entry_results(tenant_id, submitter, submission_id, manifest_id, outcome)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create idx_bulk_entry_results_outcome: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE TABLE IF NOT EXISTS bulk_submission_changes (
tenant_id TEXT NOT NULL,
submitter TEXT NOT NULL,
submission_id TEXT NOT NULL,
change_id TEXT NOT NULL,
manifest_id TEXT NOT NULL,
change_type TEXT NOT NULL,
resource_type TEXT NOT NULL,
resource_id TEXT NOT NULL,
previous_version TEXT,
new_version TEXT NOT NULL,
previous_content BLOB,
changed_at TEXT NOT NULL,
PRIMARY KEY (tenant_id, submitter, submission_id, change_id),
FOREIGN KEY (tenant_id, submitter, submission_id)
REFERENCES bulk_submissions(tenant_id, submitter, submission_id) ON DELETE CASCADE
)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create bulk_submission_changes table: {}", e),
source: None,
})
})?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_bulk_changes_resource
ON bulk_submission_changes(tenant_id, resource_type, resource_id)",
[],
)
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create idx_bulk_changes_resource: {}", e),
source: None,
})
})?;
Ok(())
}
fn migrate_v6_to_v7(conn: &Connection) -> StorageResult<()> {
let migrations = [
"ALTER TABLE resources ADD COLUMN fhir_version TEXT NOT NULL DEFAULT '4.0'",
"ALTER TABLE resource_history ADD COLUMN fhir_version TEXT NOT NULL DEFAULT '4.0'",
];
for sql in &migrations {
let _ = conn.execute(sql, []);
}
let indexes = [
"CREATE INDEX IF NOT EXISTS idx_resources_fhir_version ON resources(tenant_id, fhir_version)",
];
for index_sql in &indexes {
conn.execute(index_sql, []).map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to create index in migration: {}", e),
source: None,
})
})?;
}
Ok(())
}
#[cfg(test)]
#[allow(dead_code)]
pub fn drop_all_tables(conn: &Connection) -> StorageResult<()> {
let _ = conn.execute("DROP TABLE IF EXISTS resource_fts", []);
let _ = conn.execute("DROP TABLE IF EXISTS search_index_fts", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_submission_changes", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_entry_results", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_manifests", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_submissions", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_export_files", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_export_progress", []);
let _ = conn.execute("DROP TABLE IF EXISTS bulk_export_jobs", []);
conn.execute("DROP TABLE IF EXISTS search_index", [])
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to drop search_index: {}", e),
source: None,
})
})?;
conn.execute("DROP TABLE IF EXISTS resource_history", [])
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to drop resource_history: {}", e),
source: None,
})
})?;
conn.execute("DROP TABLE IF EXISTS resources", [])
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to drop resources: {}", e),
source: None,
})
})?;
conn.execute("DROP TABLE IF EXISTS schema_version", [])
.map_err(|e| {
crate::error::StorageError::Backend(crate::error::BackendError::Internal {
backend_name: "sqlite".to_string(),
message: format!("Failed to drop schema_version: {}", e),
source: None,
})
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_schema_initialization() {
let conn = Connection::open_in_memory().unwrap();
initialize_schema(&conn).unwrap();
let tables: Vec<String> = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.filter_map(|r| r.ok())
.collect();
assert!(tables.contains(&"resources".to_string()));
assert!(tables.contains(&"resource_history".to_string()));
assert!(tables.contains(&"search_index".to_string()));
assert!(tables.contains(&"schema_version".to_string()));
}
#[test]
fn test_schema_version() {
let conn = Connection::open_in_memory().unwrap();
initialize_schema(&conn).unwrap();
let version = get_schema_version(&conn).unwrap();
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn test_schema_idempotent() {
let conn = Connection::open_in_memory().unwrap();
initialize_schema(&conn).unwrap();
initialize_schema(&conn).unwrap();
let version = get_schema_version(&conn).unwrap();
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn test_bulk_tables_exist() {
let conn = Connection::open_in_memory().unwrap();
initialize_schema(&conn).unwrap();
let tables: Vec<String> = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.filter_map(|r| r.ok())
.collect();
assert!(tables.contains(&"bulk_export_jobs".to_string()));
assert!(tables.contains(&"bulk_export_progress".to_string()));
assert!(tables.contains(&"bulk_export_files".to_string()));
assert!(tables.contains(&"bulk_submissions".to_string()));
assert!(tables.contains(&"bulk_manifests".to_string()));
assert!(tables.contains(&"bulk_entry_results".to_string()));
assert!(tables.contains(&"bulk_submission_changes".to_string()));
}
#[test]
fn test_migration_v5_to_v6() {
let conn = Connection::open_in_memory().unwrap();
create_schema_v1(&conn).unwrap();
let _ = get_schema_version(&conn).unwrap();
migrate_v1_to_v2(&conn).unwrap();
migrate_v2_to_v3(&conn).unwrap();
migrate_v3_to_v4(&conn).unwrap();
migrate_v4_to_v5(&conn).unwrap();
set_schema_version(&conn, 5).unwrap();
let table_count: i32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name LIKE 'bulk_%'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_count, 0);
migrate_v5_to_v6(&conn).unwrap();
let table_count: i32 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name LIKE 'bulk_%'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(table_count, 7); }
}