use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use cortex_store::{
open, sqlite_compile_options, verify_sqlite_compile_options,
verify_sqlite_load_extension_disabled, StoreError,
};
use rusqlite::Connection;
fn temp_dir(label: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock after unix epoch")
.as_nanos();
std::env::temp_dir().join(format!(
"cortex-store-sqlite-hardening-{label}-{}-{nanos}",
std::process::id()
))
}
#[test]
fn sqlite_compile_options_accept_safe_options() {
verify_sqlite_compile_options(["THREADSAFE=1", "OMIT_LOAD_EXTENSION"])
.expect("safe sqlite compile options pass");
}
#[test]
fn sqlite_compile_options_reject_enable_load_extension() {
let err = verify_sqlite_compile_options(["THREADSAFE=1", "ENABLE_LOAD_EXTENSION"])
.expect_err("ENABLE_LOAD_EXTENSION fails closed");
assert!(
matches!(err, StoreError::Validation(ref message) if message.contains("ENABLE_LOAD_EXTENSION")),
"unexpected error: {err}"
);
}
#[test]
fn sqlite_compile_options_reject_prefixed_enable_load_extension() {
let err = verify_sqlite_compile_options(["SQLITE_ENABLE_LOAD_EXTENSION=1"])
.expect_err("SQLITE_ENABLE_LOAD_EXTENSION fails closed");
assert!(
matches!(err, StoreError::Validation(ref message) if message.contains("SQLITE_ENABLE_LOAD_EXTENSION")),
"unexpected error: {err}"
);
}
#[test]
fn sqlite_bundled_compile_options_are_guarded() {
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
let compile_options = sqlite_compile_options(&pool).expect("read sqlite compile options");
assert!(
!compile_options.is_empty(),
"sqlite should report compile options"
);
let result = verify_sqlite_compile_options(compile_options.iter().map(String::as_str));
if compile_options
.iter()
.any(|option| option == "ENABLE_LOAD_EXTENSION")
{
assert!(
matches!(result, Err(StoreError::Validation(ref message)) if message.contains("ENABLE_LOAD_EXTENSION")),
"bundled sqlite with ENABLE_LOAD_EXTENSION must fail closed"
);
} else {
result.expect("bundled sqlite without ENABLE_LOAD_EXTENSION passes hardening guard");
}
}
#[test]
fn sqlite_load_extension_is_disabled_on_current_connection() {
let pool = Connection::open_in_memory().expect("open in-memory sqlite");
verify_sqlite_load_extension_disabled(&pool)
.expect("SQL load_extension must not be authorized by default");
}
#[test]
fn sqlite_open_applies_runtime_hardening_guard_and_migrations() {
let dir = temp_dir("open");
let pool = open(&dir).expect("open hardened store");
let applied: u64 = pool
.query_row("SELECT COUNT(*) FROM _migrations;", [], |row| row.get(0))
.expect("query applied migrations");
assert_eq!(applied, 10);
drop(pool);
if dir.exists() {
std::fs::remove_dir_all(&dir).expect("remove temp sqlite dir");
}
}