use std::path::{Path, PathBuf};
use commonmeta::Data;
use rusqlite::{Connection, OpenFlags, params};
use crate::error::AppError;
const CACHE_DDL: &str = "
CREATE TABLE IF NOT EXISTS pid_records (
pid TEXT PRIMARY KEY NOT NULL,
source_id INTEGER NOT NULL,
resource_url TEXT NOT NULL DEFAULT '',
last_modified TEXT,
last_fetched TEXT NOT NULL DEFAULT (datetime('now')),
raw_metadata TEXT NOT NULL,
raw_metadata_type TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pid_records_fetched ON pid_records(last_fetched);
CREATE INDEX IF NOT EXISTS idx_pid_records_modified ON pid_records(last_modified);
";
fn source_to_id(source: &str) -> Result<i64, AppError> {
match source {
"crossref" => Ok(1),
"datacite" => Ok(2),
other => Err(AppError::Internal(format!("unknown source: {other}"))),
}
}
#[cfg(test)]
pub(crate) const TEST_DDL: &str = r#"
CREATE TABLE works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"metadata" BLOB NOT NULL DEFAULT x''
)
"#;
fn connect(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_ONLY)
.map_err(|e| AppError::Internal(format!("sqlite open '{}': {e}", path.display())))
}
fn connect_rw(path: &PathBuf) -> Result<Connection, AppError> {
Connection::open(path)
.map_err(|e| AppError::Internal(format!("sqlite open rw '{}': {e}", path.display())))
}
pub fn open(path: &std::path::Path) -> Result<Option<PathBuf>, AppError> {
if !path.exists() {
tracing::warn!(path = %path.display(), "sqlite file not found, running without local database");
return Ok(None);
}
let path = path.to_path_buf();
let conn = connect(&path)?;
let has_works: bool = conn
.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='works'",
[],
|_| Ok(true),
)
.unwrap_or(false);
if !has_works {
tracing::info!(path = %path.display(), "sqlite database has no works table, running without local database");
return Ok(None);
}
Ok(Some(path))
}
pub fn random_doi(path: &PathBuf) -> Result<Option<String>, AppError> {
let conn = connect(path)?;
let result = conn.query_row(
"SELECT id FROM works \
WHERE rowid >= abs(random()) % (SELECT max(rowid) FROM works) + 1 \
LIMIT 1",
[],
|row| row.get(0),
);
match result {
Ok(id) => Ok(Some(id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("sqlite random: {e}"))),
}
}
pub fn lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT metadata FROM works WHERE id = ?1",
params![id],
|row| row.get::<_, Vec<u8>>(0),
);
let blob = match result {
Ok(b) => b,
Err(rusqlite::Error::QueryReturnedNoRows) => return Ok(None),
Err(e) => return Err(AppError::Internal(format!("sqlite query: {e}"))),
};
let json = zstd::decode_all(blob.as_slice())
.map_err(|e| AppError::Internal(format!("zstd decompress: {e}")))?;
let json_str = String::from_utf8(json)
.map_err(|e| AppError::Internal(format!("metadata utf8: {e}")))?;
commonmeta::read("commonmeta", &json_str)
.map(Some)
.map_err(|e| AppError::Internal(format!("metadata parse: {e}")))
}
pub fn cache_open(path: &Path) -> Result<PathBuf, AppError> {
let path = path.to_path_buf();
let conn = connect_rw(&path)?;
let _: String = conn
.query_row("PRAGMA journal_mode=WAL", [], |row| row.get(0))
.map_err(|e| AppError::Internal(format!("cache wal: {e}")))?;
conn.execute_batch(CACHE_DDL)
.map_err(|e| AppError::Internal(format!("cache init: {e}")))?;
Ok(path)
}
pub fn cache_lookup(path: &PathBuf, doi: &str) -> Result<Option<Data>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT raw_metadata, raw_metadata_type FROM pid_records WHERE pid = ?1",
params![id],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
);
match result {
Ok((raw, raw_type)) => commonmeta::read(&raw_type, &raw)
.map(Some)
.map_err(|e| AppError::Internal(format!("cache parse: {e}"))),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache lookup: {e}"))),
}
}
pub fn cache_lookup_url(path: &PathBuf, doi: &str) -> Result<Option<String>, AppError> {
let id = commonmeta::doi_utils::normalize_doi(doi);
if id.is_empty() {
return Ok(None);
}
let conn = connect(path)?;
let result = conn.query_row(
"SELECT resource_url FROM pid_records WHERE pid = ?1",
params![id],
|row| row.get::<_, String>(0),
);
match result {
Ok(url) if !url.is_empty() => Ok(Some(url)),
Ok(_) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(AppError::Internal(format!("cache url lookup: {e}"))),
}
}
pub fn cache_insert(path: &PathBuf, data: &Data, source: &str) -> Result<(), AppError> {
let raw_bytes = commonmeta::write(source, data)
.map_err(|e| AppError::Internal(format!("serialize to {source}: {e}")))?;
let raw_metadata = String::from_utf8(raw_bytes)
.map_err(|e| AppError::Internal(format!("utf8: {e}")))?;
let sid = source_to_id(source)?;
let conn = connect_rw(path)?;
conn.execute(
"INSERT OR REPLACE INTO pid_records
(pid, source_id, resource_url, raw_metadata, raw_metadata_type)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![data.id, sid, data.url, raw_metadata, source],
)
.map_err(|e| AppError::Internal(format!("cache insert: {e}")))?;
Ok(())
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::path::Path;
pub(crate) fn make_test_cache_db(path: &Path) -> PathBuf {
let db_path = cache_open(path).expect("cache_open");
let data = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Cached Article".to_string(),
date_published: "2024-06-01".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
cache_insert(&db_path, &data, "crossref").expect("cache_insert");
db_path
}
pub(crate) fn make_test_db(path: &Path) -> PathBuf {
let conn = Connection::open(path).expect("open test db");
conn.execute_batch(TEST_DDL).expect("create schema");
let data = commonmeta::Data {
id: "https://doi.org/10.1234/test".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/test-article".to_string(),
title: "Test Article on Content Negotiation".to_string(),
date_published: "2024-01-15".to_string(),
provider: "Crossref".to_string(),
..commonmeta::Data::default()
};
let json_bytes = commonmeta::write("commonmeta", &data).unwrap();
let compressed = zstd::encode_all(json_bytes.as_slice(), 0).unwrap();
conn.execute(
r#"INSERT INTO works ("id","type","url","title","date_published","provider","metadata")
VALUES (?1,?2,?3,?4,?5,?6,?7)"#,
params![
&data.id, &data.type_, &data.url, &data.title,
&data.date_published, &data.provider, compressed,
],
)
.expect("insert test record");
path.to_path_buf()
}
#[test]
fn open_returns_none_for_missing_file() {
let result = open(Path::new("/nonexistent/path/db.sqlite3"));
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn open_returns_none_when_no_works_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("no_works.sqlite3");
let conn = Connection::open(&path).unwrap();
conn.execute_batch("CREATE TABLE organizations (id TEXT PRIMARY KEY, name TEXT);")
.unwrap();
drop(conn);
let result = open(&path);
assert!(matches!(result, Ok(None)), "expected Ok(None), got {result:?}");
}
#[test]
fn lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let result = lookup(&path, "10.9999/does-not-exist").unwrap();
assert!(result.is_none());
}
#[test]
fn lookup_finds_existing_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
let data = lookup(&path, "10.1234/test").unwrap().expect("should find DOI");
assert_eq!(data.id, "https://doi.org/10.1234/test");
assert_eq!(data.title, "Test Article on Content Negotiation");
assert_eq!(data.url, "https://example.com/test-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn lookup_normalises_doi_prefix_form() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
for doi in &[
"10.1234/test",
"https://doi.org/10.1234/test",
"http://dx.doi.org/10.1234/test",
] {
assert!(
lookup(&path, doi).unwrap().is_some(),
"should find DOI in form '{doi}'"
);
}
}
#[test]
fn lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_db(&dir.path().join("test.sqlite3"));
assert!(lookup(&path, "").unwrap().is_none());
}
#[test]
fn cache_open_creates_pid_records_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for col in &["pid", "source_id", "resource_url", "last_fetched", "raw_metadata", "raw_metadata_type"] {
assert!(cols.iter().any(|c| c == col), "missing column: {col}");
}
}
#[test]
fn cache_schema_compatible_with_vraix_transport_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let conn = Connection::open(&path).unwrap();
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(pid_records)").unwrap()
.query_map([], |row| row.get::<_, String>(1)).unwrap()
.filter_map(|r| r.ok())
.collect();
for required in &["pid", "source_id", "raw_metadata"] {
assert!(cols.iter().any(|c| c == required), "missing vraix-required column: {required}");
}
}
#[test]
fn cache_lookup_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_lookup_finds_inserted_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should find cached DOI");
assert_eq!(data.id, "https://doi.org/10.5678/cached");
assert_eq!(data.title, "Cached Article");
assert_eq!(data.url, "https://example.com/cached-article");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn cache_lookup_empty_string_returns_none() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup(&path, "").unwrap().is_none());
}
#[test]
fn cache_lookup_url_returns_url_without_json_parsing() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let url = cache_lookup_url(&path, "10.5678/cached").unwrap();
assert_eq!(url.as_deref(), Some("https://example.com/cached-article"));
}
#[test]
fn cache_lookup_url_returns_none_for_unknown_doi() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
assert!(cache_lookup_url(&path, "10.9999/nope").unwrap().is_none());
}
#[test]
fn cache_insert_unknown_source_returns_error() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("cache.sqlite3");
cache_open(&path).unwrap();
let data = commonmeta::Data {
id: "https://doi.org/10.1/x".to_string(),
..commonmeta::Data::default()
};
assert!(cache_insert(&path, &data, "openalex").is_err());
}
#[test]
fn cache_insert_replaces_existing_record() {
let dir = tempfile::tempdir().unwrap();
let path = make_test_cache_db(&dir.path().join("cache.sqlite3"));
let updated = commonmeta::Data {
id: "https://doi.org/10.5678/cached".to_string(),
type_: "JournalArticle".to_string(),
url: "https://example.com/cached-article".to_string(),
title: "Updated Title".to_string(),
..commonmeta::Data::default()
};
cache_insert(&path, &updated, "crossref").unwrap();
let data = cache_lookup(&path, "10.5678/cached").unwrap().expect("should still exist");
assert_eq!(data.title, "Updated Title");
}
}