use std::path::{Path, PathBuf};
use commonmeta::{Citation, Data};
use rusqlite::{Connection, OpenFlags, params};
use crate::error::AppError;
const CACHE_DDL: &str = "
CREATE TABLE IF NOT EXISTS pid_records (
pid TEXT PRIMARY KEY NOT NULL,
source_id INTEGER NOT NULL,
resource_url TEXT NOT NULL DEFAULT '',
last_modified TEXT,
last_fetched TEXT NOT NULL DEFAULT (datetime('now')),
raw_metadata TEXT NOT NULL,
raw_metadata_type TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pid_records_fetched ON pid_records(last_fetched);
CREATE INDEX IF NOT EXISTS idx_pid_records_modified ON pid_records(last_modified);
";
fn source_to_id(source: &str) -> Result<i64, AppError> {
match source {
"crossref" => Ok(1),
"datacite" => Ok(2),
"openalex" => Ok(3),
other => Err(AppError::Internal(format!("unknown source: {other}"))),
}
}
#[cfg(test)]
pub(crate) const TEST_DDL: &str = r#"
CREATE TABLE works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"pmid" TEXT NOT NULL DEFAULT '',
"pmcid" TEXT NOT NULL DEFAULT '',
"openalex" TEXT NOT NULL DEFAULT '',
"valid" INTEGER NOT NULL DEFAULT 0,
"metadata" BLOB NOT NULL DEFAULT x''
);
CREATE INDEX works_pmid ON works("pmid") WHERE "pmid" != '';
CREATE INDEX works_pmcid ON works("pmcid") WHERE "pmcid" != '';
CREATE INDEX works_openalex ON works("openalex") WHERE "openalex" != ''
"#;
fn connect(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
.map_err(|e| AppError::Internal(format!("sqlite open '{}': {e}", path.display())))
}
fn connect_rw(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open(path)
.map_err(|e| AppError::Internal(format!("sqlite open rw '{}': {e}", path.display())))
}
pub fn open(path: &std::path::Path) -> Result<Option<PathBuf>, AppError> {
if !path.exists() {
tracing::warn!(path = %path.display(), "sqlite file not found, running without local database");
return Ok(None);
}
let path = path.to_path_buf();
let conn = connect(&path)?;
let has_works: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='works'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !has_works {
tracing::info!(path = %path.display(), "sqlite database has no works table, running without local database");
return Ok(None);
}
Ok(Some(path))
}
pub fn random_id(path: &PathBuf) -> Result<Option<String>, AppError> {
let conn = connect(path)?;
let mut tables: Vec<&str> = Vec::new();
let works_ok: bool = conn
.query_row("SELECT 1 FROM works LIMIT 1", [], |_| Ok(true))
.unwrap_or(false);
if works_ok {
tables.push("works");
}
for tbl in &["organizations", "people"] {
let exists: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
params![tbl],
|_| Ok(true),
)
.unwrap_or(false);
if exists {
let has_rows: bool = conn
.query_row(&format!("SELECT 1 FROM {tbl} LIMIT 1"), [], |_| Ok(true))
.unwrap_or(false);
if has_rows {
tables.push(tbl);
}
}
}
if tables.is_empty() {
return Ok(None);
}
let n = tables.len() as i64;
let idx = conn
.query_row("SELECT abs(random()) % ?1", params![n], |r| r.get::<_, i64>(0))
.unwrap_or(0) as usize;
let table = tables[idx];
let query = format!(
"SELECT id FROM {table} \
WHERE rowid >= abs(random()) % (SELECT max(rowid) FROM {table}) + 1 \
LIMIT 1"
);
match conn.query_row(&query, [], |row| row.get(0)) {
Ok(id) => Ok(Some(id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("sqlite random: {e}"))),
}
}
pub fn lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE id = ?1", &id)
}
pub fn lookup_citing_dois(path: &PathBuf, doi: &str) -> Result<Vec<Citation>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(vec![]);
}
let conn = connect(path)?;
let mut stmt = conn
.prepare("SELECT work_id FROM works_references WHERE ref_id = ?1 LIMIT 50")
.map_err(|e| AppError::Internal(format!("prepare citing: {e}")))?;
let citing = stmt
.query_map(params![id], |row| row.get::<_, String>(0))
.map_err(|e| AppError::Internal(format!("query citing: {e}")))?
.filter_map(|r| r.ok())
.map(|work_id| Citation { id: work_id, ..Default::default() })
.collect();
Ok(citing)
}
pub fn lookup_by_pmid(path: &PathBuf, pmid: &str) -> Result<Option<Data>, AppError> {
if pmid.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE pmid = ?1", pmid)
}
pub fn lookup_by_arxiv(path: &PathBuf, id: &str) -> Result<Option<Data>, AppError> {
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE arxiv = ?1", id)
}
pub fn lookup_by_openalex(path: &PathBuf, id: &str) -> Result<Option<Data>, AppError> {
if id.is_empty() {
return Ok(None);
}
let bare = id
.trim_start_matches("https://openalex.org/works/")
.trim_start_matches("https://openalex.org/")
.trim_start_matches("http://openalex.org/works/")
.trim_start_matches("http://openalex.org/");
let canonical = format!("https://openalex.org/{bare}");
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE openalex = ?1", &canonical)
}
pub fn lookup_by_pmcid(path: &PathBuf, pmcid: &str) -> Result<Option<Data>, AppError> {
if pmcid.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE pmcid = ?1", pmcid)
}
pub fn lookup_ror_types(path: &PathBuf, ror_url: &str) -> Result<Vec<String>, AppError> {
if ror_url.is_empty() {
return Ok(Vec::new());
}
let conn = connect(path)?;
let has_orgs: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='organizations'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !has_orgs {
return Ok(Vec::new());
}
match conn.query_row(
"SELECT types FROM organizations WHERE id = ?1",
params![ror_url],
|row| row.get::<_, String>(0),
) {
Ok(json) => Ok(serde_json::from_str::<Vec<String>>(&json).unwrap_or_default()),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(Vec::new()),
Err(e) => Err(AppError::Internal(format!("sqlite ror types: {e}"))),
}
}
pub fn lookup_by_ror(path: &PathBuf, ror_url: &str) -> Result<Option<Data>, AppError> {
if ror_url.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let has_orgs: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='organizations'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !has_orgs {
return Ok(None);
}
let result = conn.query_row(
"SELECT metadata FROM organizations WHERE id = ?1",
params![ror_url],
|row| row.get::<_, Vec<u8>>(0),
);
let blob = match result {
Ok(b) => b,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(AppError::Internal(format!("sqlite ror query: {e}"))),
};
let json = zstd::decode_all(blob.as_slice())
.map_err(|e| AppError::Internal(format!("zstd decompress: {e}")))?;
let json_str = String::from_utf8(json)
.map_err(|e| AppError::Internal(format!("metadata utf8: {e}")))?;
commonmeta::read("ror", &json_str)
.map(Some)
.map_err(|e| AppError::Internal(format!("ror parse: {e}")))
}
fn fetch_metadata(conn: &Connection, sql: &str, param: &str) -> Result<Option<Data>, AppError> {
let result = conn.query_row(sql, params![param], |row| row.get::<_, Vec<u8>>(0));
let blob = match result {
Ok(b) => b,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(AppError::Internal(format!("sqlite query: {e}"))),
};
let json = zstd::decode_all(blob.as_slice())
.map_err(|e| AppError::Internal(format!("zstd decompress: {e}")))?;
let json_str = String::from_utf8(json)
.map_err(|e| AppError::Internal(format!("metadata utf8: {e}")))?;
commonmeta::read("commonmeta", &json_str)
.map(Some)
.map_err(|e| AppError::Internal(format!("metadata parse: {e}")))
}
pub fn cache_open(path: &Path) -> Result<PathBuf, AppError> {
let path = path.to_path_buf();
let conn = connect_rw(&path)?;
let _: String = conn
.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
.map_err(|e| AppError::Internal(format!("cache wal: {e}")))?;
conn.execute_batch(CACHE_DDL)
.map_err(|e| AppError::Internal(format!("cache init: {e}")))?;
Ok(path)
}
pub fn cache_lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT raw_metadata, raw_metadata_type FROM pid_records WHERE pid = ?1",
params![id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
);
match result {
Ok((raw, raw_type)) => commonmeta::read(&raw_type, &raw)
.map(Some)
.map_err(|e| AppError::Internal(format!("cache parse: {e}"))),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache lookup: {e}"))),
}
}
#[allow(dead_code)]
pub fn cache_lookup_url(path: &PathBuf, doi: &str) -> Result<Option<String>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT resource_url FROM pid_records WHERE pid = ?1",
params![id],
|row| row.get::<_, String>(0),
);
match result {
Ok(url) if !url.is_empty() => Ok(Some(url)),
Ok(_) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache url lookup: {e}"))),
}
}
pub fn cache_insert(path: &PathBuf, data: &Data, source: &str) -> Result<(), AppError> {
let raw_bytes = commonmeta::write(source, data)
.map_err(|e| AppError::Internal(format!("serialize to {source}: {e}")))?;
let raw_metadata = String::from_utf8(raw_bytes)
.map_err(|e| AppError::Internal(format!("utf8: {e}")))?;
let sid = source_to_id(source)?;
let conn = connect_rw(path)?;
conn.execute(
"INSERT OR REPLACE INTO pid_records
(pid, source_id, resource_url, raw_metadata, raw_metadata_type)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![data.id, sid, data.url, raw_metadata, source],
)
.map_err(|e| AppError::Internal(format!("cache insert: {e}")))?;
Ok(())
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::path::Path;
pub(crate) fn make_test_cache_db(path: &Path) -> PathBuf {
let db_path = cache_open(path).expect("cache_open");
let data = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Cached Article".to_string(),
date_published: "2024-06-01".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
cache_insert(&db_path, &data, "crossref").expect("cache_insert");
db_path
}
pub(crate) fn make_test_db(path: &Path) -> PathBuf {
let conn = Connection::open(path).expect("open test db");
conn.execute_batch(TEST_DDL).expect("create schema");
let data = commonmeta::Data {
id: "https://doi.org/10.1234/test".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/test-article".to_string(),
title: "Test Article on Content Negotiation".to_string(),
date_published: "2024-01-15".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
let compressed = compress_data(&data);
conn.execute(
r#"INSERT INTO works ("id","type","url","title","date_published","provider","metadata")
VALUES (?1,?2,?3,?4,?5,?6,?7)"#,
params![&data.id, &data.type_, &data.url, &data.title,
&data.date_published, &data.provider, compressed],
).expect("insert doi-only record");
let data_pmid = commonmeta::Data {
id: "https://doi.org/10.1234/pmid-test".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/pmid-article".to_string(),
title: "Article with PubMed IDs".to_string(),
date_published: "2024-02-01".to_string(),
provider: "PubMed".to_string(),
identifiers: vec![
commonmeta::data::Identifier {
identifier: "12345678".to_string(),
identifier_type: "PMID".to_string(),
..Default::default()
},
commonmeta::data::Identifier {
identifier: "PMC9876543".to_string(),
identifier_type: "PMCID".to_string(),
..Default::default()
},
],
..commonmeta::Data::default()
};
let compressed_pmid = compress_data(&data_pmid);
conn.execute(
r#"INSERT INTO works ("id","type","url","title","date_published","provider",
"pmid","pmcid","metadata")
VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9)"#,
params![&data_pmid.id, &data_pmid.type_, &data_pmid.url, &data_pmid.title,
&data_pmid.date_published, &data_pmid.provider,
"12345678", "PMC9876543", compressed_pmid],
).expect("insert pmid record");
path.to_path_buf()
}
fn compress_data(data: &commonmeta::Data) -> Vec<u8> {
let json_bytes = commonmeta::write("commonmeta", data).unwrap();
zstd::encode_all(json_bytes.as_slice(), 0).unwrap()
}
#[test]
fn open_returns_none_for_missing_file() {
let result = open(Path::new("/nonexistent/path/db.sqlite3"));
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn open_returns_none_when_no_works_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("no_works.sqlite3");
let conn = Connection::open(&path).unwrap();
conn.execute_batch("CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT);")
.unwrap();
drop(conn);
let result = open(&path);
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let result = lookup(&path, "10.9999/does-not-exist").unwrap();
assert!(result.is_none());
}
#[test]
fn lookup_finds_existing_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup(&path, "10.1234/test").unwrap().expect("should find DOI");
assert_eq!(data.id, "https://doi.org/10.1234/test");
assert_eq!(data.title, "Test Article on Content Negotiation");
assert_eq!(data.url, "https://example.com/test-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn lookup_normalises_doi_prefix_form() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
for doi in &[
"10.1234/test",
"https://doi.org/10.1234/test",
"http://dx.doi.org/10.1234/test",
] {
assert!(
lookup(&path, doi).unwrap().is_some(),
"should find DOI in form '{doi}'"
);
}
}
#[test]
fn lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup(&path, "").unwrap().is_none());
}
#[test]
fn lookup_by_pmid_finds_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup_by_pmid(&path, "12345678").unwrap().expect("should find by PMID");
assert_eq!(data.id, "https://doi.org/10.1234/pmid-test");
}
#[test]
fn lookup_by_pmid_returns_none_for_unknown() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmid(&path, "99999999").unwrap().is_none());
}
#[test]
fn lookup_by_pmid_empty_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmid(&path, "").unwrap().is_none());
}
#[test]
fn lookup_by_pmcid_finds_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup_by_pmcid(&path, "PMC9876543").unwrap().expect("should find by PMCID");
assert_eq!(data.id, "https://doi.org/10.1234/pmid-test");
}
#[test]
fn lookup_by_pmcid_returns_none_for_unknown() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmcid(&path, "PMC0000000").unwrap().is_none());
}
#[test]
fn cache_open_creates_pid_records_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for col in &["pid", "source_id", "resource_url", "last_fetched", "raw_metadata", "raw_metadata_type"] {
assert!(cols.iter().any(|c| c == col), "missing column: {col}");
}
}
#[test]
fn cache_schema_compatible_with_vraix_transport_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for required in &["pid", "source_id", "raw_metadata"] {
assert!(cols.iter().any(|c| c == required), "missing vraix-required column: {required}");
}
}
#[test]
fn cache_lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_lookup_finds_inserted_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should find cached DOI");
assert_eq!(data.id, "https://doi.org/10.5678/cached");
assert_eq!(data.title, "Cached Article");
assert_eq!(data.url, "https://example.com/cached-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn cache_lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "").unwrap().is_none());
}
#[test]
fn cache_lookup_url_returns_url_without_json_parsing() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let url = cache_lookup_url(&path, "10.5678/cached").unwrap();
assert_eq!(url.as_deref(), Some("https://example.com/cached-article"));
}
#[test]
fn cache_lookup_url_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup_url(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_insert_unknown_source_returns_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let data = commonmeta::Data {
id: "https://doi.org/10.1/x".to_string(),
..commonmeta::Data::default()
};
assert!(cache_insert(&path, &data, "openalex").is_err());
}
#[test]
fn cache_insert_replaces_existing_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let updated = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Updated Title".to_string(),
..commonmeta::Data::default()
};
cache_insert(&path, &updated, "crossref").unwrap();
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should still exist");
assert_eq!(data.title, "Updated Title");
}
}