use std::path::{Path, PathBuf};
use commonmeta::Data;
use rusqlite::{Connection, OpenFlags, params};
use crate::error::AppError;
const CACHE_DDL: &str = "
CREATE TABLE IF NOT EXISTS pid_records (
pid TEXT PRIMARY KEY NOT NULL,
source_id INTEGER NOT NULL,
resource_url TEXT NOT NULL DEFAULT '',
last_modified TEXT,
last_fetched TEXT NOT NULL DEFAULT (datetime('now')),
raw_metadata TEXT NOT NULL,
raw_metadata_type TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pid_records_fetched ON pid_records(last_fetched);
CREATE INDEX IF NOT EXISTS idx_pid_records_modified ON pid_records(last_modified);
";
fn source_to_id(source: &str) -> Result<i64, AppError> {
match source {
"crossref" => Ok(1),
"datacite" => Ok(2),
"openalex" => Ok(3),
other => Err(AppError::Internal(format!("unknown source: {other}"))),
}
}
#[cfg(test)]
pub(crate) const TEST_DDL: &str = r#"
CREATE TABLE works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"pmid" TEXT NOT NULL DEFAULT '',
"pmcid" TEXT NOT NULL DEFAULT '',
"openalex" TEXT NOT NULL DEFAULT '',
"valid" INTEGER NOT NULL DEFAULT 0,
"metadata" BLOB NOT NULL DEFAULT x''
);
CREATE INDEX works_pmid ON works("pmid") WHERE "pmid" != '';
CREATE INDEX works_pmcid ON works("pmcid") WHERE "pmcid" != '';
CREATE INDEX works_openalex ON works("openalex") WHERE "openalex" != ''
"#;
fn connect(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
.map_err(|e| AppError::Internal(format!("sqlite open '{}': {e}", path.display())))
}
fn connect_rw(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open(path)
.map_err(|e| AppError::Internal(format!("sqlite open rw '{}': {e}", path.display())))
}
pub fn open(path: &std::path::Path) -> Result<Option<PathBuf>, AppError> {
if !path.exists() {
tracing::warn!(path = %path.display(), "sqlite file not found, running without local database");
return Ok(None);
}
let path = path.to_path_buf();
let conn = connect(&path)?;
let has_works: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='works'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !has_works {
tracing::info!(path = %path.display(), "sqlite database has no works table, running without local database");
return Ok(None);
}
Ok(Some(path))
}
pub fn random_doi(path: &PathBuf) -> Result<Option<String>, AppError> {
let conn = connect(path)?;
let result = conn.query_row(
"SELECT id FROM works \
WHERE rowid >= abs(random()) % (SELECT max(rowid) FROM works) + 1 \
LIMIT 1",
[],
|row| row.get(0),
);
match result {
Ok(id) => Ok(Some(id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("sqlite random: {e}"))),
}
}
pub fn lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE id = ?1", &id)
}
pub fn lookup_by_pmid(path: &PathBuf, pmid: &str) -> Result<Option<Data>, AppError> {
if pmid.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE pmid = ?1", pmid)
}
pub fn lookup_by_arxiv(path: &PathBuf, id: &str) -> Result<Option<Data>, AppError> {
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE arxiv = ?1", id)
}
pub fn lookup_by_openalex(path: &PathBuf, id: &str) -> Result<Option<Data>, AppError> {
if id.is_empty() {
return Ok(None);
}
let bare = id
.trim_start_matches("https://openalex.org/works/")
.trim_start_matches("https://openalex.org/")
.trim_start_matches("http://openalex.org/works/")
.trim_start_matches("http://openalex.org/");
let canonical = format!("https://openalex.org/{bare}");
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE openalex = ?1", &canonical)
}
pub fn lookup_by_pmcid(path: &PathBuf, pmcid: &str) -> Result<Option<Data>, AppError> {
if pmcid.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
fetch_metadata(&conn, "SELECT metadata FROM works WHERE pmcid = ?1", pmcid)
}
fn fetch_metadata(conn: &Connection, sql: &str, param: &str) -> Result<Option<Data>, AppError> {
let result = conn.query_row(sql, params![param], |row| row.get::<_, Vec<u8>>(0));
let blob = match result {
Ok(b) => b,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(AppError::Internal(format!("sqlite query: {e}"))),
};
let json = zstd::decode_all(blob.as_slice())
.map_err(|e| AppError::Internal(format!("zstd decompress: {e}")))?;
let json_str = String::from_utf8(json)
.map_err(|e| AppError::Internal(format!("metadata utf8: {e}")))?;
commonmeta::read("commonmeta", &json_str)
.map(Some)
.map_err(|e| AppError::Internal(format!("metadata parse: {e}")))
}
pub fn cache_open(path: &Path) -> Result<PathBuf, AppError> {
let path = path.to_path_buf();
let conn = connect_rw(&path)?;
let _: String = conn
.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
.map_err(|e| AppError::Internal(format!("cache wal: {e}")))?;
conn.execute_batch(CACHE_DDL)
.map_err(|e| AppError::Internal(format!("cache init: {e}")))?;
Ok(path)
}
pub fn cache_lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT raw_metadata, raw_metadata_type FROM pid_records WHERE pid = ?1",
params![id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
);
match result {
Ok((raw, raw_type)) => commonmeta::read(&raw_type, &raw)
.map(Some)
.map_err(|e| AppError::Internal(format!("cache parse: {e}"))),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache lookup: {e}"))),
}
}
pub fn cache_lookup_url(path: &PathBuf, doi: &str) -> Result<Option<String>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT resource_url FROM pid_records WHERE pid = ?1",
params![id],
|row| row.get::<_, String>(0),
);
match result {
Ok(url) if !url.is_empty() => Ok(Some(url)),
Ok(_) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache url lookup: {e}"))),
}
}
pub fn cache_insert(path: &PathBuf, data: &Data, source: &str) -> Result<(), AppError> {
let raw_bytes = commonmeta::write(source, data)
.map_err(|e| AppError::Internal(format!("serialize to {source}: {e}")))?;
let raw_metadata = String::from_utf8(raw_bytes)
.map_err(|e| AppError::Internal(format!("utf8: {e}")))?;
let sid = source_to_id(source)?;
let conn = connect_rw(path)?;
conn.execute(
"INSERT OR REPLACE INTO pid_records
(pid, source_id, resource_url, raw_metadata, raw_metadata_type)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![data.id, sid, data.url, raw_metadata, source],
)
.map_err(|e| AppError::Internal(format!("cache insert: {e}")))?;
Ok(())
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::path::Path;
pub(crate) fn make_test_cache_db(path: &Path) -> PathBuf {
let db_path = cache_open(path).expect("cache_open");
let data = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Cached Article".to_string(),
date_published: "2024-06-01".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
cache_insert(&db_path, &data, "crossref").expect("cache_insert");
db_path
}
pub(crate) fn make_test_db(path: &Path) -> PathBuf {
let conn = Connection::open(path).expect("open test db");
conn.execute_batch(TEST_DDL).expect("create schema");
let data = commonmeta::Data {
id: "https://doi.org/10.1234/test".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/test-article".to_string(),
title: "Test Article on Content Negotiation".to_string(),
date_published: "2024-01-15".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
let compressed = compress_data(&data);
conn.execute(
r#"INSERT INTO works ("id","type","url","title","date_published","provider","metadata")
VALUES (?1,?2,?3,?4,?5,?6,?7)"#,
params![&data.id, &data.type_, &data.url, &data.title,
&data.date_published, &data.provider, compressed],
).expect("insert doi-only record");
let data_pmid = commonmeta::Data {
id: "https://doi.org/10.1234/pmid-test".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/pmid-article".to_string(),
title: "Article with PubMed IDs".to_string(),
date_published: "2024-02-01".to_string(),
provider: "PubMed".to_string(),
identifiers: vec![
commonmeta::data::Identifier {
identifier: "12345678".to_string(),
identifier_type: "PMID".to_string(),
..Default::default()
},
commonmeta::data::Identifier {
identifier: "PMC9876543".to_string(),
identifier_type: "PMCID".to_string(),
..Default::default()
},
],
..commonmeta::Data::default()
};
let compressed_pmid = compress_data(&data_pmid);
conn.execute(
r#"INSERT INTO works ("id","type","url","title","date_published","provider",
"pmid","pmcid","metadata")
VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9)"#,
params![&data_pmid.id, &data_pmid.type_, &data_pmid.url, &data_pmid.title,
&data_pmid.date_published, &data_pmid.provider,
"12345678", "PMC9876543", compressed_pmid],
).expect("insert pmid record");
path.to_path_buf()
}
fn compress_data(data: &commonmeta::Data) -> Vec<u8> {
let json_bytes = commonmeta::write("commonmeta", data).unwrap();
zstd::encode_all(json_bytes.as_slice(), 0).unwrap()
}
#[test]
fn open_returns_none_for_missing_file() {
let result = open(Path::new("/nonexistent/path/db.sqlite3"));
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn open_returns_none_when_no_works_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("no_works.sqlite3");
let conn = Connection::open(&path).unwrap();
conn.execute_batch("CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT);")
.unwrap();
drop(conn);
let result = open(&path);
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let result = lookup(&path, "10.9999/does-not-exist").unwrap();
assert!(result.is_none());
}
#[test]
fn lookup_finds_existing_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup(&path, "10.1234/test").unwrap().expect("should find DOI");
assert_eq!(data.id, "https://doi.org/10.1234/test");
assert_eq!(data.title, "Test Article on Content Negotiation");
assert_eq!(data.url, "https://example.com/test-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn lookup_normalises_doi_prefix_form() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
for doi in &[
"10.1234/test",
"https://doi.org/10.1234/test",
"http://dx.doi.org/10.1234/test",
] {
assert!(
lookup(&path, doi).unwrap().is_some(),
"should find DOI in form '{doi}'"
);
}
}
#[test]
fn lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup(&path, "").unwrap().is_none());
}
#[test]
fn lookup_by_pmid_finds_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup_by_pmid(&path, "12345678").unwrap().expect("should find by PMID");
assert_eq!(data.id, "https://doi.org/10.1234/pmid-test");
}
#[test]
fn lookup_by_pmid_returns_none_for_unknown() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmid(&path, "99999999").unwrap().is_none());
}
#[test]
fn lookup_by_pmid_empty_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmid(&path, "").unwrap().is_none());
}
#[test]
fn lookup_by_pmcid_finds_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup_by_pmcid(&path, "PMC9876543").unwrap().expect("should find by PMCID");
assert_eq!(data.id, "https://doi.org/10.1234/pmid-test");
}
#[test]
fn lookup_by_pmcid_returns_none_for_unknown() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup_by_pmcid(&path, "PMC0000000").unwrap().is_none());
}
#[test]
fn cache_open_creates_pid_records_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for col in &["pid", "source_id", "resource_url", "last_fetched", "raw_metadata", "raw_metadata_type"] {
assert!(cols.iter().any(|c| c == col), "missing column: {col}");
}
}
#[test]
fn cache_schema_compatible_with_vraix_transport_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for required in &["pid", "source_id", "raw_metadata"] {
assert!(cols.iter().any(|c| c == required), "missing vraix-required column: {required}");
}
}
#[test]
fn cache_lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_lookup_finds_inserted_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should find cached DOI");
assert_eq!(data.id, "https://doi.org/10.5678/cached");
assert_eq!(data.title, "Cached Article");
assert_eq!(data.url, "https://example.com/cached-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn cache_lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "").unwrap().is_none());
}
#[test]
fn cache_lookup_url_returns_url_without_json_parsing() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let url = cache_lookup_url(&path, "10.5678/cached").unwrap();
assert_eq!(url.as_deref(), Some("https://example.com/cached-article"));
}
#[test]
fn cache_lookup_url_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup_url(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_insert_unknown_source_returns_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let data = commonmeta::Data {
id: "https://doi.org/10.1/x".to_string(),
..commonmeta::Data::default()
};
assert!(cache_insert(&path, &data, "openalex").is_err());
}
#[test]
fn cache_insert_replaces_existing_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let updated = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Updated Title".to_string(),
..commonmeta::Data::default()
};
cache_insert(&path, &updated, "crossref").unwrap();
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should still exist");
assert_eq!(data.title, "Updated Title");
}
}