use std::path::Path;
use serde::Serialize;
use serde_json::Value;
use url::Url;
use fluent_uri::Uri as FUri;
use crate::data::Data;
use crate::error::{sqlite_err, Error, Result};
use crate::schema_utils::json_schema_errors;
use crate::utils::{normalize_id, normalize_ror};
const COMMONMETA_V1_SCHEMA_URL: &str = "https://commonmeta.org/commonmeta_v1.0.json";
pub fn read(json: &str) -> Result<Data> {
let value: Value = serde_json::from_str(json).map_err(|e| Error::Parse(e.to_string()))?;
if !looks_like_v1(&value) {
return Err(Error::Parse(
"commonmeta input is not schema v1.0 shaped".to_string(),
));
}
serde_json::from_value(value).map_err(|e| Error::Parse(e.to_string()))
}
pub fn prepare(data: &Data) -> Data {
let mut out = data.clone();
out.schema_version = COMMONMETA_V1_SCHEMA_URL.to_string();
if !out.id.is_empty() {
out.id = normalize_id(&out.id);
}
if out.type_.is_empty() {
out.type_ = "Other".to_string();
}
for c in &mut out.contributors {
if let Some(p) = c.person.as_mut() {
p.affiliations.retain(|a| !a.id.is_empty() || !a.name.is_empty());
}
if let Some(org) = &c.organization {
if org.id.is_empty() && org.name.is_empty() {
c.organization = None;
}
}
}
out.contributors.retain(|c| match c.type_.as_str() {
"Person" => c.person.is_some(),
"Organization" => c.organization.is_some(),
_ => true,
});
out.identifiers.retain(|i| i.identifier != out.id);
for r in &mut out.references {
r.publisher.clear();
r.publication_year.clear();
r.volume.clear();
r.issue.clear();
r.first_page.clear();
r.last_page.clear();
r.unstructured.clear();
}
if !out.publisher.id.is_empty() && normalize_ror(&out.publisher.id).is_empty() {
out.publisher.id.clear();
}
for c in &mut out.contributors {
if let Some(p) = &mut c.person {
for aff in &mut p.affiliations {
if !aff.id.is_empty() && normalize_ror(&aff.id).is_empty() {
aff.id.clear();
}
}
}
if let Some(org) = &mut c.organization {
if !org.id.is_empty() && normalize_ror(&org.id).is_empty() {
org.id.clear();
}
}
}
for r in &mut out.references {
if !r.id.is_empty() {
let normalized = normalize_id(&r.id);
r.id = if normalized.is_empty() {
String::new()
} else {
let candidate = match Url::parse(&normalized) {
Ok(u) => {
let s = u.to_string();
if s.ends_with('/') { s[..s.len() - 1].to_string() } else { s }
}
Err(_) => String::new(),
};
if candidate.is_empty() || FUri::parse(candidate.as_str()).is_err() {
String::new()
} else {
candidate
}
};
}
}
out.references.retain(|r| {
!r.id.is_empty() || !r.key.is_empty() || !r.reference.is_empty() || !r.title.is_empty()
});
if !out.license.url.is_empty() && FUri::parse(out.license.url.as_str()).is_err() {
out.license.url = String::new();
}
out.files.retain(|f| !f.url.is_empty() && FUri::parse(f.url.as_str()).is_ok());
if !out.url.is_empty() && FUri::parse(out.url.as_str()).is_err() {
out.url = String::new();
}
{
let mut seen = std::collections::HashSet::new();
out.geo_locations.retain(|g| {
let key = serde_json::to_string(g).unwrap_or_default();
seen.insert(key)
});
}
for id in &mut out.identifiers {
let known = matches!(id.identifier_type.as_str(),
"ARK" | "arXiv" | "article_id" | "Bibcode" | "DOI" | "GeoNames" | "Handle"
| "ISBN" | "ISSN" | "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
if !known {
if id.scheme.is_empty() {
id.scheme = std::mem::take(&mut id.identifier_type);
}
id.identifier_type = "Other".to_string();
}
}
{
let raw = std::mem::take(&mut out.container.identifier_type);
let known = matches!(raw.as_str(),
"ARK" | "arXiv" | "article_id" | "Bibcode" | "DOI" | "GeoNames" | "Handle"
| "ISBN" | "ISSN" | "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
match raw.as_str() {
"EISSN" | "PISSN" => out.container.identifier_type = "ISSN".to_string(),
_ if known => out.container.identifier_type = raw,
_ if !raw.is_empty() => {
if out.container.scheme.is_empty() {
out.container.scheme = raw;
}
out.container.identifier_type = "Other".to_string();
}
_ => {}
}
}
out.additional_titles.retain(|t| !t.title.is_empty());
for f in &mut out.funding_references {
if !f.funder_id.is_empty() && FUri::parse(f.funder_id.as_str()).is_err() {
f.funder_id = String::new();
}
}
out.funding_references.retain(|f| {
!f.funder_name.is_empty()
|| !f.funder_id.is_empty()
|| !f.award_number.is_empty()
|| !f.award_title.is_empty()
});
for rel in &mut out.relations {
let normalized = normalize_id(&rel.id);
rel.id = normalized;
}
out.relations.retain(|r| !r.id.is_empty());
out
}
pub fn write(data: &Data) -> Result<Vec<u8>> {
let out = prepare(data);
let bytes = serde_json::to_vec(&out).map_err(|e| Error::Serialize(e.to_string()))?;
if !matches!(out.type_.as_str(), "Person" | "Organization") {
json_schema_errors(&bytes, Some("commonmeta"))?;
}
Ok(bytes)
}
pub fn write_all(list: &[Data]) -> Result<Vec<u8>> {
let prepared: Vec<Data> = list.iter().map(prepare).collect();
let bytes =
serde_json::to_vec_pretty(&prepared).map_err(|e| Error::Serialize(e.to_string()))?;
json_schema_errors(&bytes, Some("commonmeta"))?;
Ok(bytes)
}
fn looks_like_v1(value: &Value) -> bool {
let Some(obj) = value.as_object() else {
return false;
};
obj.get("schema_version").and_then(Value::as_str) == Some(COMMONMETA_V1_SCHEMA_URL)
|| obj.contains_key("date_published")
|| obj.contains_key("additional_titles")
|| obj.contains_key("additional_descriptions")
|| obj
.get("identifiers")
.and_then(Value::as_array)
.and_then(|ids| ids.first())
.and_then(Value::as_object)
.is_some_and(|id_obj| id_obj.contains_key("identifier_type"))
|| obj
.get("contributors")
.and_then(Value::as_array)
.and_then(|contributors| contributors.first())
.and_then(Value::as_object)
.is_some_and(|contributor| {
contributor.contains_key("person") || contributor.contains_key("organization")
})
}
#[derive(
Debug,
Default,
Clone,
Serialize,
parquet_derive::ParquetRecordWriter,
parquet_derive::ParquetRecordReader,
)]
pub struct CommonmetaRow {
pub id: String,
pub record_type: String,
pub title: String,
pub url: String,
pub doi: String,
pub publisher: String,
pub language: String,
pub version: String,
pub license: String,
pub container_title: String,
pub container_type: String,
pub volume: String,
pub issue: String,
pub first_page: String,
pub last_page: String,
pub date_published: String,
pub date_created: String,
pub date_updated: String,
pub contributor_count: i32,
pub first_author_name: String,
pub first_author_orcid: String,
pub subjects: String,
pub description: String,
pub provider: String,
pub additional_type: String,
pub json: String,
}
fn flatten_row(data: &Data) -> CommonmetaRow {
let doi = data
.identifiers
.iter()
.find(|i| i.identifier_type == "DOI")
.map(|i| i.identifier.clone())
.unwrap_or_else(|| {
if data.id.contains("doi.org") {
data.id.clone()
} else {
String::new()
}
});
let (first_author_name, first_author_orcid) = data
.contributors
.first()
.map(|c| (c.name(), c.id().to_string()))
.unwrap_or_default();
let subjects = data
.subjects
.iter()
.map(|s| s.subject.as_str())
.collect::<Vec<_>>()
.join("; ");
let json = serde_json::to_string(data).unwrap_or_default();
CommonmetaRow {
id: data.id.clone(),
record_type: data.type_.clone(),
title: data.title.clone(),
url: data.url.clone(),
doi,
publisher: data.publisher.name.clone(),
language: data.language.clone(),
version: data.version.clone(),
license: data.license.id.clone(),
container_title: data.container.title.clone(),
container_type: data.container.type_.clone(),
volume: data.container.volume.clone(),
issue: data.container.issue.clone(),
first_page: data.container.first_page.clone(),
last_page: data.container.last_page.clone(),
date_published: data.date_published.clone(),
date_created: data.dates.created.clone(),
date_updated: data.date_updated.clone(),
contributor_count: data.contributors.len() as i32,
first_author_name,
first_author_orcid,
subjects,
description: data.description.clone(),
provider: data.provider.clone(),
additional_type: data.additional_type.clone(),
json,
}
}
const ROW_GROUP_SIZE: usize = 100_000;
pub fn write_parquet_all(list: &[Data]) -> Result<Vec<u8>> {
write_parquet_chunked(list, ROW_GROUP_SIZE)
}
fn write_parquet_chunked(list: &[Data], row_group_size: usize) -> Result<Vec<u8>> {
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use parquet::record::RecordWriter;
let chunks: Vec<&[Data]> = if list.is_empty() {
vec![&[][..]]
} else {
list.chunks(row_group_size).collect()
};
let row_chunks: Vec<Vec<CommonmetaRow>> = std::thread::scope(|scope| {
let handles: Vec<_> = chunks
.into_iter()
.map(|chunk| scope.spawn(move || chunk.iter().map(flatten_row).collect::<Vec<_>>()))
.collect();
handles
.into_iter()
.map(|h| {
h.join()
.map_err(|_| Error::Serialize("parquet flatten thread panicked".to_string()))
})
.collect::<Result<Vec<_>>>()
})?;
let schema = row_chunks[0]
.as_slice()
.schema()
.map_err(|e| Error::Serialize(e.to_string()))?;
let props = std::sync::Arc::new(WriterProperties::builder().build());
let buffer: Vec<u8> = Vec::new();
let mut writer = SerializedFileWriter::new(buffer, schema, props)
.map_err(|e| Error::Serialize(e.to_string()))?;
for rows in &row_chunks {
let mut row_group = writer
.next_row_group()
.map_err(|e| Error::Serialize(e.to_string()))?;
rows.as_slice()
.write_to_row_group(&mut row_group)
.map_err(|e| Error::Serialize(e.to_string()))?;
row_group
.close()
.map_err(|e| Error::Serialize(e.to_string()))?;
}
writer
.into_inner()
.map_err(|e| Error::Serialize(e.to_string()))
}
fn unflatten_row(row: &CommonmetaRow) -> Data {
if !row.json.is_empty()
&& let Ok(data) = serde_json::from_str::<Data>(&row.json)
{
return data;
}
unflatten_row_lossy(row)
}
fn unflatten_row_lossy(row: &CommonmetaRow) -> Data {
Data {
id: row.id.clone(),
type_: row.record_type.clone(),
additional_type: row.additional_type.clone(),
title: row.title.clone(),
url: row.url.clone(),
identifiers: if row.doi.is_empty() {
Vec::new()
} else {
vec![crate::data::Identifier {
identifier: row.doi.clone(),
identifier_type: "DOI".to_string(),
..Default::default()
}]
},
publisher: crate::data::Publisher {
name: row.publisher.clone(),
..Default::default()
},
language: row.language.clone(),
version: row.version.clone(),
license: crate::data::License {
id: row.license.clone(),
..Default::default()
},
container: crate::data::Container {
title: row.container_title.clone(),
type_: row.container_type.clone(),
volume: row.volume.clone(),
issue: row.issue.clone(),
first_page: row.first_page.clone(),
last_page: row.last_page.clone(),
..Default::default()
},
date_published: row.date_published.clone(),
date_updated: row.date_updated.clone(),
dates: crate::data::Dates {
created: row.date_created.clone(),
..Default::default()
},
contributors: if row.first_author_name.is_empty() && row.first_author_orcid.is_empty() {
Vec::new()
} else {
vec![crate::data::Contributor::person(
crate::data::Person {
id: row.first_author_orcid.clone(),
..Default::default()
},
Vec::new(),
)]
},
subjects: row
.subjects
.split("; ")
.filter(|s| !s.is_empty())
.map(|s| crate::data::Subject {
subject: s.to_string(),
..Default::default()
})
.collect(),
description: row.description.clone(),
provider: row.provider.clone(),
..Default::default()
}
}
const SQLITE_DDL: &str = r#"PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS settings (
"key" TEXT PRIMARY KEY NOT NULL,
"value" TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"pmid" TEXT NOT NULL DEFAULT '',
"pmcid" TEXT NOT NULL DEFAULT '',
"openalex" TEXT NOT NULL DEFAULT '',
"arxiv" TEXT NOT NULL DEFAULT '',
"valid" INTEGER NOT NULL DEFAULT 0,
"metadata" BLOB NOT NULL DEFAULT x''
);
CREATE INDEX IF NOT EXISTS works_type ON works("type");
CREATE INDEX IF NOT EXISTS works_date_published ON works("date_published");
CREATE INDEX IF NOT EXISTS works_date_updated ON works("date_updated");
CREATE INDEX IF NOT EXISTS works_provider ON works("provider");
CREATE INDEX IF NOT EXISTS works_pmid ON works("pmid") WHERE "pmid" != '';
CREATE INDEX IF NOT EXISTS works_pmcid ON works("pmcid") WHERE "pmcid" != '';
CREATE INDEX IF NOT EXISTS works_openalex ON works("openalex") WHERE "openalex" != '';
CREATE INDEX IF NOT EXISTS works_arxiv ON works("arxiv") WHERE "arxiv" != '';
CREATE TABLE IF NOT EXISTS works_orcid (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"orcid" TEXT NOT NULL,
PRIMARY KEY ("work_id", "orcid")
);
CREATE INDEX IF NOT EXISTS works_orcid_orcid ON works_orcid("orcid");
CREATE TABLE IF NOT EXISTS works_ror (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"ror" TEXT NOT NULL,
PRIMARY KEY ("work_id", "ror")
);
CREATE INDEX IF NOT EXISTS works_ror_ror ON works_ror("ror");
CREATE TABLE IF NOT EXISTS works_references (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"ref_id" TEXT NOT NULL,
PRIMARY KEY ("work_id", "ref_id")
);
CREATE INDEX IF NOT EXISTS works_references_ref_id ON works_references("ref_id");
CREATE VIRTUAL TABLE IF NOT EXISTS works_fts USING fts5(title, subjects, content="works", content_rowid="rowid", tokenize="unicode61 remove_diacritics 1");"#;
const WORKS_FTS5_DDL: &str = "CREATE VIRTUAL TABLE IF NOT EXISTS works_fts USING fts5(\
title, subjects, \
content=\"works\", \
content_rowid=\"rowid\", \
tokenize=\"unicode61 remove_diacritics 1\"\
)";
const SCHEMA_VERSION: u32 = 5;
const SQLITE_MIGRATIONS: &[&str] = &[
r#"ALTER TABLE works ADD COLUMN "pmid" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "pmcid" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "openalex" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "arxiv" TEXT NOT NULL DEFAULT ''"#,
r#"CREATE INDEX IF NOT EXISTS works_arxiv ON works("arxiv") WHERE "arxiv" != ''"#,
r#"CREATE TABLE IF NOT EXISTS works_orcid (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"orcid" TEXT NOT NULL,
PRIMARY KEY ("work_id", "orcid")
)"#,
r#"CREATE INDEX IF NOT EXISTS works_orcid_orcid ON works_orcid("orcid")"#,
r#"CREATE TABLE IF NOT EXISTS works_ror (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"ror" TEXT NOT NULL,
PRIMARY KEY ("work_id", "ror")
)"#,
r#"CREATE INDEX IF NOT EXISTS works_ror_ror ON works_ror("ror")"#,
r#"CREATE TABLE IF NOT EXISTS works_references (
"work_id" TEXT NOT NULL REFERENCES works("id") ON DELETE CASCADE,
"ref_id" TEXT NOT NULL,
PRIMARY KEY ("work_id", "ref_id")
)"#,
r#"CREATE INDEX IF NOT EXISTS works_references_ref_id ON works_references("ref_id")"#,
r#"CREATE VIRTUAL TABLE IF NOT EXISTS works_fts USING fts5(title, subjects, content="works", content_rowid="rowid", tokenize="unicode61 remove_diacritics 1")"#,
];
const SQLITE_INSERT: &str = r#"INSERT OR REPLACE INTO works (
"id", "type", "url", "title", "subjects",
"language", "date_published", "date_updated", "provider",
"pmid", "pmcid", "openalex", "arxiv", "valid", "metadata"
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)"#;
const SQLITE_UPDATE: &str = r#"UPDATE works SET
"type" = ?2, "url" = ?3, "title" = ?4, "subjects" = ?5,
"language" = ?6, "date_published" = ?7, "date_updated" = ?8, "provider" = ?9,
"pmid" = ?10, "pmcid" = ?11, "openalex" = ?12, "arxiv" = ?13, "valid" = ?14, "metadata" = ?15
WHERE "id" = ?1"#;
pub struct PreparedRow {
pub id: String,
pub type_: String,
pub url: String,
pub title: String,
pub subjects: String,
pub language: String,
pub date_published: String,
pub date_updated: String,
pub provider: String,
pub pmid: String,
pub pmcid: String,
pub openalex: String,
pub arxiv: String,
pub valid: bool,
pub metadata: Vec<u8>,
pub orcids: Vec<String>,
pub rors: Vec<String>,
pub ref_ids: Vec<String>,
}
pub fn serialize_to_row(data: Data) -> PreparedRow {
let data = prepare(&data);
let subjects = serde_json::to_string(&data.subjects).unwrap_or_default();
let pmid = data.identifiers.iter()
.find(|i| i.identifier_type == "PMID")
.and_then(|i| crate::utils::normalize_pmid(&i.identifier, crate::utils::PmcResolver::Ncbi))
.unwrap_or_default();
let pmcid = data.identifiers.iter()
.find(|i| i.identifier_type == "PMCID")
.and_then(|i| crate::utils::normalize_pmcid(&i.identifier, crate::utils::PmcResolver::Ncbi))
.unwrap_or_default();
let openalex = data.identifiers.iter()
.find(|i| i.identifier_type == "OpenAlex")
.map(|i| i.identifier.clone())
.unwrap_or_default();
let arxiv = data.identifiers.iter()
.find(|i| i.identifier_type == "arXiv")
.and_then(|i| crate::utils::normalize_arxiv(&i.identifier))
.unwrap_or_default();
let mut orcids: Vec<String> = Vec::new();
let mut rors: Vec<String> = Vec::new();
for c in &data.contributors {
if let Some(p) = &c.person {
if p.id.starts_with("https://orcid.org/") && !orcids.contains(&p.id) {
orcids.push(p.id.clone());
}
for aff in &p.affiliations {
if aff.id.starts_with("https://ror.org/") && !rors.contains(&aff.id) {
rors.push(aff.id.clone());
}
}
}
if let Some(org) = &c.organization {
if org.id.starts_with("https://ror.org/") && !rors.contains(&org.id) {
rors.push(org.id.clone());
}
}
}
let ref_ids: Vec<String> = data.references
.iter()
.filter_map(|r| {
if r.id.starts_with("https://doi.org/") { Some(r.id.clone()) } else { None }
})
.collect();
let json = serde_json::to_string(&data).unwrap_or_default();
let metadata = zstd::encode_all(json.as_bytes(), 0).unwrap_or_else(|_| json.into_bytes());
PreparedRow {
id: data.id,
type_: data.type_,
url: data.url,
title: data.title,
subjects,
language: data.language,
date_published: data.date_published,
date_updated: data.date_updated,
provider: data.provider,
pmid,
pmcid,
openalex,
arxiv,
valid: false,
metadata,
orcids,
rors,
ref_ids,
}
}
pub(crate) fn init_sqlite_writer(path: &Path, overwrite: bool) -> Result<rusqlite::Connection> {
if overwrite && path.exists() {
std::fs::remove_file(path)
.map_err(|e| Error::Parse(format!("failed to remove '{}': {}", path.display(), e)))?;
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("failed to set WAL mode: {}", e)))?;
let version: u32 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
let works_exists: bool = conn
.query_row(
"SELECT COUNT(rowid) FROM sqlite_master WHERE type='table' AND name='works'",
[],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0) > 0;
if version < SCHEMA_VERSION && works_exists {
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("ALTER TABLE")) {
if let Err(e) = conn.execute(stmt, []) {
if !e.to_string().contains("duplicate column name") {
return Err(Error::Parse(format!("schema migration: {e}")));
}
}
}
}
conn.execute_batch(SQLITE_DDL)
.map_err(|e| Error::Parse(format!("failed to create works table: {}", e)))?;
if version < SCHEMA_VERSION {
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| !s.starts_with("ALTER TABLE")) {
if let Err(e) = conn.execute(stmt, []) {
let msg = e.to_string();
if !msg.contains("duplicate column name") && !msg.contains("already exists") {
return Err(Error::Parse(format!("schema migration: {e}")));
}
}
}
conn.execute_batch(&format!("PRAGMA user_version = {SCHEMA_VERSION}"))
.map_err(|e| Error::Parse(format!("set user_version: {e}")))?;
}
Ok(conn)
}
pub fn run_migrations(path: &Path) -> Result<(usize, u32)> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("failed to set WAL mode: {}", e)))?;
let version: u32 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
if version >= SCHEMA_VERSION {
return Ok((0, version));
}
let mut applied = 0;
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("ALTER TABLE")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", stmt);
match conn.execute(stmt, []) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("duplicate column name") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
{
let ck_start = std::time::Instant::now();
eprint!(" PRAGMA wal_checkpoint(PASSIVE) … ");
let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)");
eprintln!("{:.1?}", ck_start.elapsed());
}
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("CREATE TABLE")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", &stmt[..stmt.find('(').unwrap_or(stmt.len())].trim());
match conn.execute_batch(stmt) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("already exists") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("CREATE INDEX")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", stmt);
match conn.execute(stmt, []) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("already exists") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("CREATE VIRTUAL TABLE")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", &stmt[..stmt.find('(').unwrap_or(stmt.len())].trim());
match conn.execute_batch(stmt) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("already exists") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
conn.execute_batch(&format!("PRAGMA user_version = {SCHEMA_VERSION}"))
.map_err(|e| Error::Parse(format!("set user_version: {e}")))?;
Ok((applied, SCHEMA_VERSION))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum JunctionTable { Orcid, Ror, References }
impl JunctionTable {
fn name(self) -> &'static str {
match self { Self::Orcid => "orcid", Self::Ror => "ror", Self::References => "references" }
}
}
pub fn backfill_junction_tables(path: &Path, tables: &[JunctionTable], providers: &[&str]) -> Result<(usize, usize)> {
assert!(!tables.is_empty(), "backfill: at least one table required");
let mut sorted = tables.to_vec();
sorted.sort();
sorted.dedup();
let tables_tag: String = sorted.iter().map(|t| t.name()).collect::<Vec<_>>().join("_");
let mut sorted_providers: Vec<&str> = providers.to_vec();
sorted_providers.sort();
sorted_providers.dedup();
let providers_tag = if sorted_providers.is_empty() {
String::new()
} else {
format!("_{}", sorted_providers.iter().map(|p| p.to_lowercase()).collect::<Vec<_>>().join("_"))
};
let tag = format!("{tables_tag}{providers_tag}");
let cursor_key = format!("backfill_{tag}_recid");
let inserted_key = format!("backfill_{tag}_inserted");
let provider_clause = match sorted_providers.as_slice() {
[] => String::new(),
[p] => format!(r#" AND "provider" = '{}'"#, p),
ps => {
let list = ps.iter().map(|p| format!("'{p}'")).collect::<Vec<_>>().join(", ");
format!(r#" AND "provider" IN ({list})"#)
}
};
let do_orcid = sorted.contains(&JunctionTable::Orcid);
let do_ror = sorted.contains(&JunctionTable::Ror);
let do_refs = sorted.contains(&JunctionTable::References);
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("set WAL: {}", e)))?;
conn.execute_batch("PRAGMA cache_size = -524288")
.map_err(|e| Error::Parse(format!("set cache: {}", e)))?;
let cursor_val = |key: &str| -> i64 {
conn.query_row(
r#"SELECT "value" FROM settings WHERE "key" = ?1"#,
[key],
|r| r.get::<_, String>(0),
).ok().and_then(|v| v.parse().ok()).unwrap_or(0)
};
let mut cursor: i64 = cursor_val(&cursor_key);
let prev_inserted: usize = cursor_val(&inserted_key) as usize;
if cursor > 0 {
eprintln!("backfill ({tag}): resuming from rowid {cursor} ({prev_inserted} rows already indexed)");
}
let max_rowid: i64 = conn
.query_row(&format!(r#"SELECT MAX(rowid) FROM works WHERE 1=1{provider_clause}"#), [], |r| r.get::<_, Option<i64>>(0))
.unwrap_or(None).unwrap_or(0);
let total_works: usize = conn
.query_row(&format!(r#"SELECT COUNT(rowid) FROM works WHERE 1=1{provider_clause}"#), [], |r| r.get::<_, i64>(0))
.unwrap_or(0).max(0) as usize;
const BATCH: usize = 50_000;
let mut scanned = 0usize;
let mut inserted = 0usize;
let start = std::time::Instant::now();
let read_sql = format!(
r#"SELECT rowid, "id", "metadata" FROM works WHERE rowid > ?1{provider_clause} ORDER BY rowid LIMIT ?2"#
);
let mut read_stmt = conn.prepare(&read_sql).map_err(|e| Error::Parse(e.to_string()))?;
loop {
let raw: Vec<(i64, String, Vec<u8>)> = {
let mut rows = read_stmt
.query(rusqlite::params![cursor, BATCH as i64])
.map_err(|e| Error::Parse(e.to_string()))?;
let mut v = Vec::with_capacity(BATCH);
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
v.push((
row.get(0).map_err(|e| Error::Parse(e.to_string()))?,
row.get(1).map_err(|e| Error::Parse(e.to_string()))?,
row.get(2).map_err(|e| Error::Parse(e.to_string()))?,
));
}
v
};
if raw.is_empty() { break; }
let batch_scanned = raw.len();
let last_rowid = raw.last().map(|(r, _, _)| *r).unwrap_or(cursor);
let mut orcid_pairs: Vec<(String, String)> = Vec::new();
let mut ror_pairs: Vec<(String, String)> = Vec::new();
let mut ref_pairs: Vec<(String, String)> = Vec::new();
for (_, id, blob) in &raw {
let dec = zstd::decode_all(std::io::Cursor::new(blob))
.map_err(|e| Error::Parse(format!("decompress '{}': {}", id, e)))?;
let data: Data = serde_json::from_slice(&dec)
.map_err(|e| Error::Parse(format!("deserialize '{}': {}", id, e)))?;
if do_orcid {
for c in &data.contributors {
if let Some(p) = &c.person {
if p.id.starts_with("https://orcid.org/") {
orcid_pairs.push((id.clone(), p.id.clone()));
}
}
}
}
if do_ror {
for c in &data.contributors {
if let Some(p) = &c.person {
for aff in &p.affiliations {
if aff.id.starts_with("https://ror.org/") {
ror_pairs.push((id.clone(), aff.id.clone()));
}
}
}
if let Some(org) = &c.organization {
if org.id.starts_with("https://ror.org/") {
ror_pairs.push((id.clone(), org.id.clone()));
}
}
}
}
if do_refs {
for r in &data.references {
if r.id.starts_with("https://doi.org/") {
ref_pairs.push((id.clone(), r.id.clone()));
}
}
}
}
let tx = conn.unchecked_transaction()
.map_err(|e| Error::Parse(e.to_string()))?;
let mut batch_inserted = 0usize;
if do_orcid && !orcid_pairs.is_empty() {
let mut s = tx.prepare(r#"INSERT OR IGNORE INTO works_orcid ("work_id","orcid") VALUES (?1,?2)"#)
.map_err(|e| Error::Parse(e.to_string()))?;
for (wid, oid) in &orcid_pairs {
batch_inserted += s.execute(rusqlite::params![wid, oid])
.map_err(|e| Error::Parse(e.to_string()))?;
}
}
if do_ror && !ror_pairs.is_empty() {
let mut s = tx.prepare(r#"INSERT OR IGNORE INTO works_ror ("work_id","ror") VALUES (?1,?2)"#)
.map_err(|e| Error::Parse(e.to_string()))?;
for (wid, ror) in &ror_pairs {
batch_inserted += s.execute(rusqlite::params![wid, ror])
.map_err(|e| Error::Parse(e.to_string()))?;
}
}
if do_refs && !ref_pairs.is_empty() {
let mut s = tx.prepare(r#"INSERT OR IGNORE INTO works_references ("work_id","ref_id") VALUES (?1,?2)"#)
.map_err(|e| Error::Parse(e.to_string()))?;
for (wid, rid) in &ref_pairs {
batch_inserted += s.execute(rusqlite::params![wid, rid])
.map_err(|e| Error::Parse(e.to_string()))?;
}
}
inserted += batch_inserted;
let total_inserted = prev_inserted + inserted;
let upsert = r#"INSERT INTO settings ("key","value") VALUES (?1,?2)
ON CONFLICT("key") DO UPDATE SET "value" = excluded."value""#;
tx.execute(upsert, rusqlite::params![cursor_key, last_rowid.to_string()])
.map_err(|e| Error::Parse(e.to_string()))?;
tx.execute(upsert, rusqlite::params![inserted_key, total_inserted.to_string()])
.map_err(|e| Error::Parse(e.to_string()))?;
tx.commit().map_err(|e| Error::Parse(e.to_string()))?;
cursor = last_rowid;
scanned += batch_scanned;
let pct = if max_rowid > 0 { cursor * 100 / max_rowid } else { 100 };
let secs = start.elapsed().as_secs();
let rate = if secs > 0 { scanned / secs as usize } else { 0 };
eprintln!(
"backfill ({tag}): {scanned}/{total_works} works ({pct}%) — {total_inserted} rows indexed — {rate}/s — {:.0?} elapsed",
start.elapsed(),
);
}
let _ = conn.execute(
r#"DELETE FROM settings WHERE "key" IN (?1, ?2)"#,
rusqlite::params![cursor_key, inserted_key],
);
let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)");
Ok((total_works, prev_inserted + inserted))
}
pub fn backfill_works_references(path: &Path) -> Result<(usize, usize)> {
backfill_junction_tables(path, &[JunctionTable::References], &[])
}
pub fn rebuild_works_fts(path: &Path) -> Result<()> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("open sqlite '{}': {}", path.display(), e)))?;
conn.execute_batch(WORKS_FTS5_DDL)
.map_err(|e| Error::Parse(format!("works_fts DDL: {}", e)))?;
conn.execute("INSERT INTO works_fts(works_fts) VALUES('rebuild')", [])
.map_err(|e| Error::Parse(format!("works_fts rebuild: {}", e)))?;
Ok(())
}
pub(crate) fn write_rows_in_tx(
tx: &rusqlite::Transaction,
rows: &[PreparedRow],
) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let mut stmt = tx
.prepare(SQLITE_INSERT)
.map_err(|e| sqlite_err(e, "failed to prepare insert"))?;
let mut orcid_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_orcid ("work_id","orcid") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare orcid insert"))?;
let mut ror_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_ror ("work_id","ror") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare ror insert"))?;
let mut ref_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_references ("work_id","ref_id") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare references insert"))?;
for row in rows {
let id_for_err = row.id.clone();
stmt.execute(rusqlite::params![
row.id, row.type_, row.url, row.title, row.subjects,
row.language, row.date_published, row.date_updated, row.provider,
row.pmid, row.pmcid, row.openalex, row.arxiv,
row.valid as i32, row.metadata,
])
.map_err(|e| sqlite_err(e, &format!("failed to insert '{}'", id_for_err)))?;
tx.execute(r#"DELETE FROM works_orcid WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete orcid rows"))?;
for orcid in &row.orcids {
orcid_stmt.execute(rusqlite::params![row.id, orcid])
.map_err(|e| sqlite_err(e, "failed to insert orcid row"))?;
}
tx.execute(r#"DELETE FROM works_ror WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete ror rows"))?;
for ror in &row.rors {
ror_stmt.execute(rusqlite::params![row.id, ror])
.map_err(|e| sqlite_err(e, "failed to insert ror row"))?;
}
tx.execute(r#"DELETE FROM works_references WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete reference rows"))?;
for ref_id in &row.ref_ids {
ref_stmt.execute(rusqlite::params![row.id, ref_id])
.map_err(|e| sqlite_err(e, "failed to insert reference row"))?;
}
}
Ok(())
}
pub(crate) fn write_sqlite_batch_rows(
conn: &rusqlite::Connection,
rows: Vec<PreparedRow>,
) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let tx = conn
.unchecked_transaction()
.map_err(|e| sqlite_err(e, "failed to begin transaction"))?;
write_rows_in_tx(&tx, &rows)?;
tx.commit()
.map_err(|e| sqlite_err(e, "failed to commit transaction"))?;
let _ = conn.execute("PRAGMA wal_checkpoint(PASSIVE)", []);
Ok(())
}
pub fn write_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, true)
}
pub fn upsert_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, false)
}
fn write_sqlite_impl(data: &[Data], path: &Path, overwrite: bool) -> Result<()> {
let rows: Vec<PreparedRow> = data.iter().map(|d| serialize_to_row(d.clone())).collect();
let conn = init_sqlite_writer(path, overwrite)?;
write_sqlite_batch_rows(&conn, rows)
}
pub fn count_sqlite_works(path: &Path) -> Result<usize> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
let n: i64 = conn
.query_row("SELECT COUNT(rowid) FROM works", [], |row| row.get(0))
.map_err(|e| Error::Parse(e.to_string()))?;
Ok(n.max(0) as usize)
}
pub fn set_sqlite_setting(path: &Path, key: &str, value: &str) -> Result<()> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
conn.execute(
r#"INSERT OR REPLACE INTO settings ("key","value") VALUES (?1,?2)"#,
rusqlite::params![key, value],
)
.map_err(|e| Error::Parse(e.to_string()))?;
Ok(())
}
pub fn get_sqlite_setting(path: &Path, key: &str) -> Result<Option<String>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
match conn.query_row(
r#"SELECT "value" FROM settings WHERE "key" = ?1 LIMIT 1"#,
rusqlite::params![key],
|row| row.get::<_, String>(0),
) {
Ok(v) => Ok(Some(v)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(_) => Ok(None),
}
}
pub fn get_all_sqlite_settings(path: &Path) -> Result<Vec<(String, String)>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
let mut stmt = match conn.prepare(r#"SELECT "key","value" FROM settings ORDER BY "key""#) {
Ok(s) => s,
Err(_) => return Ok(vec![]),
};
let rows = stmt
.query_map([], |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)))
.map_err(|e| Error::Parse(e.to_string()))?
.filter_map(|r| r.ok())
.collect();
Ok(rows)
}
const SQLITE_SELECT: &str = r#"SELECT "metadata" FROM works ORDER BY rowid"#;
fn read_sqlite_rows(
conn: &rusqlite::Connection,
limit: Option<usize>,
offset: usize,
) -> Result<Vec<Data>> {
let sql = match (limit, offset) {
(Some(n), o) => format!("{} LIMIT {} OFFSET {}", SQLITE_SELECT, n, o),
(None, o) if o > 0 => format!("{} LIMIT -1 OFFSET {}", SQLITE_SELECT, o),
_ => SQLITE_SELECT.to_string(),
};
let mut stmt = conn.prepare(&sql).map_err(|e| Error::Parse(e.to_string()))?;
let mut rows = stmt.query([]).map_err(|e| Error::Parse(e.to_string()))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let blob: Vec<u8> = row
.get(0)
.map_err(|e| Error::Parse(format!("failed to read metadata blob: {}", e)))?;
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
results.push(data);
}
Ok(results)
}
pub fn read_sqlite_commonmeta(path: &Path, limit: Option<usize>, offset: usize) -> Result<Vec<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_rows(&conn, limit, offset)
}
pub(crate) enum WorksColumn {
Id,
Pmid,
Pmcid,
Openalex,
Arxiv,
}
impl WorksColumn {
fn as_col_name(&self) -> &'static str {
match self {
Self::Id => "id",
Self::Pmid => "pmid",
Self::Pmcid => "pmcid",
Self::Openalex => "openalex",
Self::Arxiv => "arxiv",
}
}
}
pub fn read_sqlite_by_id(id: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_by_column(&conn, WorksColumn::Id, id)
}
pub fn read_sqlite_by_pmid(pmid: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_pmid(pmid, crate::utils::PmcResolver::Ncbi)
.unwrap_or_else(|| pmid.to_string());
read_sqlite_by_column(&conn, WorksColumn::Pmid, &key)
}
pub fn read_sqlite_by_pmcid(pmcid: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_pmcid(pmcid, crate::utils::PmcResolver::Ncbi)
.unwrap_or_else(|| pmcid.to_string());
read_sqlite_by_column(&conn, WorksColumn::Pmcid, &key)
}
pub fn read_sqlite_by_openalex(openalex: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_by_column(&conn, WorksColumn::Openalex, openalex)
}
pub fn read_sqlite_by_arxiv(arxiv: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_arxiv(arxiv).unwrap_or_else(|| arxiv.to_string());
read_sqlite_by_column(&conn, WorksColumn::Arxiv, &key)
}
pub fn read_sqlite_by_dois(dois: &[String], path: &Path) -> Result<Vec<Data>> {
if dois.is_empty() {
return Ok(Vec::new());
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let keys: Vec<String> = dois
.iter()
.map(|d| crate::doi_utils::normalize_doi(d))
.filter(|s| !s.is_empty())
.collect();
if keys.is_empty() {
return Ok(Vec::new());
}
let placeholders = (1..=keys.len())
.map(|i| format!("?{}", i))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(r#"SELECT "metadata" FROM works WHERE "id" IN ({})"#, placeholders);
let mut stmt = conn.prepare(&sql).map_err(|e| Error::Parse(e.to_string()))?;
let params: Vec<&dyn rusqlite::types::ToSql> =
keys.iter().map(|s| s as &dyn rusqlite::types::ToSql).collect();
let mut rows = stmt.query(params.as_slice()).map_err(|e| Error::Parse(e.to_string()))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let blob: Vec<u8> = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
results.push(data);
}
Ok(results)
}
pub fn read_sqlite_by_orcid(orcid_url: &str, path: &Path) -> Result<Vec<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let orcid = if orcid_url.starts_with("https://orcid.org/") {
orcid_url.to_string()
} else {
format!("https://orcid.org/{orcid_url}")
};
read_sqlite_by_junction(&conn, "works_orcid", "orcid", &orcid)
}
pub fn read_sqlite_by_citation(doi: &str, path: &Path) -> Result<Vec<Data>> {
let id = crate::doi_utils::normalize_doi(doi);
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("open sqlite '{}': {}", path.display(), e)))?;
read_sqlite_by_junction(&conn, "works_references", "ref_id", &id)
}
pub fn read_sqlite_by_ror(ror_url: &str, path: &Path) -> Result<Vec<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let ror = if ror_url.starts_with("https://ror.org/") {
ror_url.to_string()
} else {
format!("https://ror.org/{ror_url}")
};
read_sqlite_by_junction(&conn, "works_ror", "ror", &ror)
}
fn read_sqlite_by_junction(
conn: &rusqlite::Connection,
table: &'static str,
col: &'static str,
val: &str,
) -> Result<Vec<Data>> {
let sql = format!(
r#"SELECT w."metadata" FROM works w
JOIN "{table}" j ON j."work_id" = w."id"
WHERE j."{col}" = ?1
ORDER BY w."date_published" DESC"#
);
let mut stmt = conn.prepare(&sql).map_err(|e| Error::Parse(e.to_string()))?;
let mut rows = stmt.query(rusqlite::params![val]).map_err(|e| Error::Parse(e.to_string()))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let blob: Vec<u8> = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
results.push(data);
}
Ok(results)
}
fn read_sqlite_by_column(conn: &rusqlite::Connection, col: WorksColumn, val: &str) -> Result<Option<Data>> {
let sql = format!(
r#"SELECT "metadata" FROM works WHERE "{}" = ?1 LIMIT 1"#,
col.as_col_name()
);
let result = conn.query_row(&sql, rusqlite::params![val], |row| row.get::<_, Vec<u8>>(0));
match result {
Ok(blob) => {
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
Ok(Some(data))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(Error::Parse(e.to_string())),
}
}
pub(crate) fn update_rows_in_tx(
tx: &rusqlite::Transaction,
rows: &[PreparedRow],
) -> Result<usize> {
if rows.is_empty() {
return Ok(0);
}
let mut stmt = tx
.prepare(SQLITE_UPDATE)
.map_err(|e| sqlite_err(e, "failed to prepare update"))?;
let mut orcid_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_orcid ("work_id","orcid") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare orcid insert"))?;
let mut ror_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_ror ("work_id","ror") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare ror insert"))?;
let mut ref_stmt = tx
.prepare(r#"INSERT OR IGNORE INTO works_references ("work_id","ref_id") VALUES (?1,?2)"#)
.map_err(|e| sqlite_err(e, "failed to prepare references insert"))?;
let mut count = 0usize;
for row in rows {
count += stmt
.execute(rusqlite::params![
row.id, row.type_, row.url, row.title, row.subjects,
row.language, row.date_published, row.date_updated, row.provider,
row.pmid, row.pmcid, row.openalex, row.arxiv,
row.valid as i32, row.metadata,
])
.map_err(|e| sqlite_err(e, &format!("failed to update '{}'", row.id)))?;
tx.execute(r#"DELETE FROM works_orcid WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete orcid rows"))?;
for orcid in &row.orcids {
orcid_stmt.execute(rusqlite::params![row.id, orcid])
.map_err(|e| sqlite_err(e, "failed to insert orcid row"))?;
}
tx.execute(r#"DELETE FROM works_ror WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete ror rows"))?;
for ror in &row.rors {
ror_stmt.execute(rusqlite::params![row.id, ror])
.map_err(|e| sqlite_err(e, "failed to insert ror row"))?;
}
tx.execute(r#"DELETE FROM works_references WHERE "work_id" = ?1"#, [&row.id])
.map_err(|e| sqlite_err(e, "failed to delete reference rows"))?;
for ref_id in &row.ref_ids {
ref_stmt.execute(rusqlite::params![row.id, ref_id])
.map_err(|e| sqlite_err(e, "failed to insert reference row"))?;
}
}
Ok(count)
}
use crate::schema_utils::collect_leaf_errors;
pub struct ValidationError {
pub id: String,
pub errors: Vec<String>,
}
pub struct ValidationReport {
pub total: usize,
pub valid: usize,
pub invalid: usize,
pub fixed: usize,
pub errors: Vec<ValidationError>,
}
const PROVIDER_FILTER: &str = concat!(
r#"(works."provider" = ?1 OR EXISTS ("#,
"SELECT 1 FROM prefixes WHERE ",
"prefix = SUBSTR(works.id, INSTR(works.id, '10.'), ",
"INSTR(SUBSTR(works.id, INSTR(works.id, '10.')), '/') - 1) ",
"AND ra = ?1))"
);
pub fn validate_sqlite(
path: &Path,
provider: Option<&str>,
work_type: Option<&str>,
id: Option<&str>,
has_ror_id: bool,
limit: usize,
fix: bool,
recheck: bool,
) -> Result<ValidationReport> {
use serde_json::Value;
use crate::schema_utils::SCHEMA_JSON;
let schema_json: Value = serde_json::from_str(SCHEMA_JSON)
.map_err(|e| Error::Parse(format!("failed to parse commonmeta schema: {e}")))?;
let validation_schema = {
let mut merged = serde_json::Map::new();
if let Some(v) = schema_json.get("$schema") { merged.insert("$schema".to_string(), v.clone()); }
if let Some(v) = schema_json.get("$id") { merged.insert("$id".to_string(), v.clone()); }
if let Some(v) = schema_json.get("definitions") { merged.insert("definitions".to_string(), v.clone()); }
if let Some(Value::Object(cm)) = schema_json.get("commonmeta") {
for (k, v) in cm { merged.insert(k.clone(), v.clone()); }
}
Value::Object(merged)
};
let compiled = jsonschema::validator_for(&validation_schema)
.map_err(|e| Error::Parse(format!("failed to compile commonmeta schema: {e}")))?;
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let _ = conn.execute_batch("PRAGMA cache_size=-65536; PRAGMA mmap_size=4294967296;");
conn.execute_batch(
r#"CREATE TABLE IF NOT EXISTS validation_errors (
id TEXT PRIMARY KEY,
errors TEXT NOT NULL,
checked_at TEXT NOT NULL
);"#,
).map_err(|e| Error::Parse(format!("failed to create validation_errors table: {e}")))?;
crate::doi_utils::ensure_prefixes_table(&conn);
let _ = conn.execute_batch(r#"ALTER TABLE works ADD COLUMN "valid" INTEGER NOT NULL DEFAULT 0;"#);
if fix || recheck {
let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
}
let mut where_parts = Vec::new();
if recheck { where_parts.push(r#"works."valid" = 0"#); }
if provider.is_some() { where_parts.push(PROVIDER_FILTER); }
if work_type.is_some() { where_parts.push(r#"works."type" = ?2"#); }
if id.is_some() { where_parts.push(r#"works."id" = ?3"#); }
if has_ror_id { where_parts.push(r#"EXISTS (SELECT 1 FROM works_ror WHERE works_ror."work_id" = works."id")"#); }
let where_sql = if where_parts.is_empty() {
String::new()
} else {
format!("WHERE {}", where_parts.join(" AND "))
};
let count_sql = if where_sql.is_empty() {
"SELECT COALESCE(MAX(rowid), 0) FROM works".to_string()
} else {
format!(r#"SELECT COUNT(rowid) FROM works {where_sql}"#)
};
let (limit_ph, offset_ph) = if id.is_some() { ("?4", "?5") } else { ("?3", "?4") };
let cursor_sql = format!(
r#"SELECT works.id, works.metadata FROM works {where_sql}
ORDER BY works.rowid LIMIT {limit_ph} OFFSET {offset_ph}"#
);
let provider_param = provider.unwrap_or("");
let type_param = work_type.unwrap_or("");
let id_param = id.unwrap_or("");
let row_count: u64 = if id.is_some() {
conn.query_row(&count_sql, rusqlite::params![provider_param, type_param, id_param], |r| r.get::<_, i64>(0))
} else {
conn.query_row(&count_sql, rusqlite::params![provider_param, type_param], |r| r.get::<_, i64>(0))
}.unwrap_or(0).max(0) as u64;
let total_to_check = if limit == 0 { row_count } else { row_count.min(limit as u64) };
let bar = crate::progress::count_bar("validating", total_to_check);
let mut stmt = conn.prepare(&cursor_sql)
.map_err(|e| Error::Parse(e.to_string()))?;
const BATCH: usize = 10_000;
let mut valid = 0usize;
let mut fixed = 0usize;
let mut report_errors: Vec<ValidationError> = Vec::new();
let mut offset = 0usize;
let mut total = 0usize;
let upsert_error_sql = r#"INSERT INTO validation_errors (id, errors, checked_at)
VALUES (?1, ?2, datetime('now'))
ON CONFLICT(id) DO UPDATE SET errors = excluded.errors, checked_at = excluded.checked_at"#;
loop {
let remaining = if limit == 0 { BATCH } else { limit.saturating_sub(total) };
if remaining == 0 { break; }
let batch_size = BATCH.min(remaining);
let batch_offset = if recheck { 0 } else { offset };
let raw_batch: Vec<(String, Vec<u8>)> = {
let mut rows = if id.is_some() {
stmt.query(rusqlite::params![provider_param, type_param, id_param, batch_size as i64, batch_offset as i64])
} else {
stmt.query(rusqlite::params![provider_param, type_param, batch_size as i64, batch_offset as i64])
}.map_err(|e| Error::Parse(e.to_string()))?;
let mut v = Vec::with_capacity(batch_size);
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let work_id: String = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let blob: Vec<u8> = row.get(1).map_err(|e| Error::Parse(e.to_string()))?;
v.push((work_id, blob));
}
v
};
if raw_batch.is_empty() { break; }
let batch_count = raw_batch.len();
let mut passing_ids: Vec<String> = Vec::new();
let mut fix_updates: Vec<(Vec<u8>, String)> = Vec::new();
let mut error_pairs: Vec<(String, Vec<String>)> = Vec::new();
for (id, blob) in &raw_batch {
let decompressed = match zstd::decode_all(std::io::Cursor::new(blob)) {
Ok(d) => d,
Err(e) => { eprintln!("validate: decompress '{}': {}", id, e); continue; }
};
let doc: Value = match serde_json::from_slice(&decompressed) {
Ok(v) => v,
Err(e) => { eprintln!("validate: parse '{}': {}", id, e); continue; }
};
let raw_errs: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&doc).collect();
let errs = collect_leaf_errors(&raw_errs);
if errs.is_empty() {
passing_ids.push(id.clone());
valid += 1;
} else if fix {
if let Ok(data) = serde_json::from_value::<Data>(doc) {
let repaired_row = serialize_to_row(data);
let repaired_doc: Value = serde_json::from_slice(
&zstd::decode_all(std::io::Cursor::new(&repaired_row.metadata))
.unwrap_or_default()
).unwrap_or(Value::Null);
let repaired_raw: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&repaired_doc).collect();
let remaining_errs = collect_leaf_errors(&repaired_raw);
if remaining_errs.is_empty() {
fix_updates.push((repaired_row.metadata, id.clone()));
fixed += 1;
valid += 1;
} else {
error_pairs.push((id.clone(), remaining_errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: remaining_errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
bar.inc(1);
}
let tx = conn.unchecked_transaction()
.map_err(|e| Error::Parse(format!("begin transaction: {e}")))?;
for id in &passing_ids {
tx.execute(r#"UPDATE works SET "valid" = 1 WHERE id = ?1"#, [id]).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (meta, id) in &fix_updates {
tx.execute(
r#"UPDATE works SET "metadata" = ?1, "valid" = 1 WHERE id = ?2"#,
rusqlite::params![meta, id],
).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (id, errors) in &error_pairs {
let errors_json = serde_json::to_string(errors).unwrap_or_default();
tx.execute(upsert_error_sql, rusqlite::params![id, errors_json]).ok();
}
tx.commit().map_err(|e| Error::Parse(format!("commit transaction: {e}")))?;
total += batch_count;
offset += batch_count;
if batch_count < batch_size { break; }
if recheck && passing_ids.is_empty() && fix_updates.is_empty() { break; }
}
bar.finish_and_clear();
Ok(ValidationReport {
total,
valid,
invalid: report_errors.len(),
fixed,
errors: report_errors,
})
}
#[derive(Debug, Default)]
pub struct FillReport {
pub total: usize,
pub changed: usize,
pub affiliations_filled: usize,
}
pub fn fill_sqlite(
path: &Path,
ror_db_path: &Path,
provider: Option<&str>,
work_type: Option<&str>,
id: Option<&str>,
has_ror_id: bool,
limit: usize,
) -> Result<FillReport> {
use crate::formats::ror::lookup_org_sqlite;
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let _ = conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA cache_size=-65536;");
crate::doi_utils::ensure_prefixes_table(&conn);
let (where_sql, count_sql, cursor_sql) = build_fill_query(provider, work_type, id, has_ror_id);
let provider_param = provider.unwrap_or("");
let type_param = work_type.unwrap_or("");
let id_param = id.unwrap_or("");
let row_count: u64 = if id.is_some() {
conn.query_row(&count_sql, rusqlite::params![provider_param, type_param, id_param], |r| r.get::<_, i64>(0))
} else {
conn.query_row(&count_sql, rusqlite::params![provider_param, type_param], |r| r.get::<_, i64>(0))
}.unwrap_or(0).max(0) as u64;
let _ = where_sql;
let total_to_check = if limit == 0 {
row_count
} else {
row_count.min(limit as u64)
};
let bar = crate::progress::count_bar("filling", total_to_check);
let mut stmt = conn
.prepare(&cursor_sql)
.map_err(|e| Error::Parse(e.to_string()))?;
const BATCH: usize = 10_000;
let mut total = 0usize;
let mut changed = 0usize;
let mut affiliations_filled = 0usize;
let mut offset = 0usize;
loop {
let remaining = if limit == 0 {
BATCH
} else {
limit.saturating_sub(total)
};
if remaining == 0 {
break;
}
let batch_size = BATCH.min(remaining);
let raw_batch: Vec<(String, Vec<u8>)> = {
let mut rows = if id.is_some() {
stmt.query(rusqlite::params![provider_param, type_param, id_param, batch_size as i64, offset as i64])
} else {
stmt.query(rusqlite::params![provider_param, type_param, batch_size as i64, offset as i64])
}.map_err(|e| Error::Parse(e.to_string()))?;
let mut v = Vec::with_capacity(batch_size);
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let work_id: String = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let blob: Vec<u8> = row.get(1).map_err(|e| Error::Parse(e.to_string()))?;
v.push((work_id, blob));
}
v
};
if raw_batch.is_empty() {
break;
}
let batch_count = raw_batch.len();
let mut fill_updates: Vec<(Vec<u8>, String)> = Vec::new();
for (id, blob) in &raw_batch {
let decompressed = match zstd::decode_all(std::io::Cursor::new(blob)) {
Ok(d) => d,
Err(e) => {
eprintln!("fill: decompress '{}': {}", id, e);
bar.inc(1);
continue;
}
};
let mut data: Data = match serde_json::from_slice(&decompressed) {
Ok(d) => d,
Err(e) => {
eprintln!("fill: parse '{}': {}", id, e);
bar.inc(1);
continue;
}
};
let mut record_changed = false;
for contributor in &mut data.contributors {
if let Some(person) = &mut contributor.person {
for aff in &mut person.affiliations {
if let Some((ror_id, name)) =
fill_affiliation_lookup(aff, ror_db_path, &lookup_org_sqlite)
{
aff.id = ror_id;
aff.name = name;
aff.asserted_by = "Commonmeta".to_string();
affiliations_filled += 1;
record_changed = true;
}
}
}
if let Some(org) = &mut contributor.organization {
if let Some((ror_id, name)) =
fill_org_lookup(org, ror_db_path, &lookup_org_sqlite)
{
org.id = ror_id;
if org.name.is_empty() {
org.name = name;
}
org.asserted_by = "Commonmeta".to_string();
affiliations_filled += 1;
record_changed = true;
}
}
}
for funder in &mut data.funding_references {
if let Some((ror_id, name)) =
fill_funding_lookup(funder, ror_db_path, &lookup_org_sqlite)
{
funder.funder_id = ror_id;
funder.funder_name = name;
funder.asserted_by = "Commonmeta".to_string();
affiliations_filled += 1;
record_changed = true;
}
}
if record_changed {
let row = serialize_to_row(data);
fill_updates.push((row.metadata, id.clone()));
changed += 1;
}
bar.inc(1);
}
if !fill_updates.is_empty() {
let tx = conn
.unchecked_transaction()
.map_err(|e| Error::Parse(format!("begin transaction: {e}")))?;
for (meta, id) in &fill_updates {
tx.execute(
r#"UPDATE works SET "metadata" = ?1 WHERE id = ?2"#,
rusqlite::params![meta, id],
)
.ok();
}
tx.commit()
.map_err(|e| Error::Parse(format!("commit transaction: {e}")))?;
}
total += batch_count;
offset += batch_count;
if batch_count < batch_size {
break;
}
}
bar.finish_and_clear();
Ok(FillReport {
total,
changed,
affiliations_filled,
})
}
fn build_fill_query(
provider: Option<&str>,
work_type: Option<&str>,
id: Option<&str>,
has_ror_id: bool,
) -> (String, String, String) {
let mut where_parts: Vec<&str> = Vec::new();
if provider.is_some() { where_parts.push(PROVIDER_FILTER); }
if work_type.is_some() { where_parts.push(r#"works."type" = ?2"#); }
if id.is_some() { where_parts.push(r#"works."id" = ?3"#); }
if has_ror_id { where_parts.push(r#"EXISTS (SELECT 1 FROM works_ror WHERE works_ror."work_id" = works."id")"#); }
let where_sql = if where_parts.is_empty() {
String::new()
} else {
format!("WHERE {}", where_parts.join(" AND "))
};
let count_sql = if where_sql.is_empty() {
"SELECT COALESCE(MAX(rowid), 0) FROM works".to_string()
} else {
format!("SELECT COUNT(rowid) FROM works {}", where_sql)
};
let (limit_ph, offset_ph) = if id.is_some() { ("?4", "?5") } else { ("?3", "?4") };
let cursor_sql = format!(
r#"SELECT works.id, works.metadata FROM works {}
ORDER BY works.rowid LIMIT {limit_ph} OFFSET {offset_ph}"#,
where_sql
);
(where_sql, count_sql, cursor_sql)
}
fn fill_affiliation_lookup(
aff: &crate::data::Affiliation,
ror_db_path: &Path,
lookup: &dyn Fn(&str, &Path) -> Option<(String, String)>,
) -> Option<(String, String)> {
use crate::utils::validate_id;
if aff.id.is_empty() {
return None;
}
let (_, id_type) = validate_id(&aff.id);
match id_type {
"ROR" => {
if aff.name.is_empty() {
lookup(&aff.id, ror_db_path)
} else {
None
}
}
"Crossref Funder ID" | "ISNI" => lookup(&aff.id, ror_db_path),
_ => None,
}
}
fn fill_funding_lookup(
funder: &crate::data::FundingReference,
ror_db_path: &Path,
lookup: &dyn Fn(&str, &Path) -> Option<(String, String)>,
) -> Option<(String, String)> {
use crate::utils::validate_id;
if funder.funder_id.is_empty() {
return None;
}
let (_, id_type) = validate_id(&funder.funder_id);
match id_type {
"ROR" => {
if funder.funder_name.is_empty() {
lookup(&funder.funder_id, ror_db_path)
} else {
None
}
}
"Crossref Funder ID" | "ISNI" => lookup(&funder.funder_id, ror_db_path),
_ => None,
}
}
fn fill_org_lookup(
org: &crate::data::Organization,
ror_db_path: &Path,
lookup: &dyn Fn(&str, &Path) -> Option<(String, String)>,
) -> Option<(String, String)> {
use crate::utils::validate_id;
if org.id.is_empty() {
return None;
}
let (_, id_type) = validate_id(&org.id);
match id_type {
"ROR" => {
if org.name.is_empty() {
lookup(&org.id, ror_db_path)
} else {
None
}
}
"Crossref Funder ID" | "ISNI" => lookup(&org.id, ror_db_path),
_ => None,
}
}
pub fn read_parquet_all(bytes: &[u8]) -> Result<Vec<Data>> {
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::RecordReader;
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.to_vec()))
.map_err(|e| Error::Parse(e.to_string()))?;
let mut rows: Vec<CommonmetaRow> = Vec::new();
for i in 0..reader.num_row_groups() {
let mut row_group_reader = reader
.get_row_group(i)
.map_err(|e| Error::Parse(e.to_string()))?;
let num_rows = row_group_reader.metadata().num_rows() as usize;
rows.read_from_row_group(&mut *row_group_reader, num_rows)
.map_err(|e| Error::Parse(e.to_string()))?;
}
Ok(rows.iter().map(unflatten_row).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{Contributor, Identifier, Person};
fn sample_data() -> Data {
Data {
id: "https://doi.org/10.1234/abc".to_string(),
type_: "JournalArticle".to_string(),
title: "A Sample Title".to_string(),
identifiers: vec![Identifier {
identifier: "10.1234/abc".to_string(),
identifier_type: "DOI".to_string(),
..Default::default()
}],
contributors: vec![Contributor::person(
Person {
given_name: "Jane".to_string(),
family_name: "Doe".to_string(),
id: "https://orcid.org/0000-0002-1825-0097".to_string(),
..Default::default()
},
Vec::new(),
)],
..Data::default()
}
}
#[test]
fn test_flatten_row_basic() {
let row = flatten_row(&sample_data());
assert_eq!(row.id, "https://doi.org/10.1234/abc");
assert_eq!(row.record_type, "JournalArticle");
assert_eq!(row.title, "A Sample Title");
assert_eq!(row.doi, "10.1234/abc");
assert_eq!(row.first_author_name, "Jane Doe");
assert_eq!(
row.first_author_orcid,
"https://orcid.org/0000-0002-1825-0097"
);
assert_eq!(row.contributor_count, 1);
}
#[test]
fn test_flatten_row_doi_fallback_from_id() {
let mut data = sample_data();
data.identifiers.clear();
let row = flatten_row(&data);
assert_eq!(row.doi, "https://doi.org/10.1234/abc");
}
#[test]
fn test_write_parquet_all_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], b"PAR1");
assert_eq!(&bytes[bytes.len() - 4..], b"PAR1");
}
#[test]
fn test_write_parquet_all_empty() {
let list: Vec<Data> = vec![];
let bytes = write_parquet_all(&list).unwrap();
assert_eq!(&bytes[0..4], b"PAR1");
}
#[test]
fn test_write_parquet_all_readable_schema_and_rows() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes)).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.file_metadata().num_rows(), 2);
let schema = metadata.file_metadata().schema_descr();
let column_names: Vec<String> = (0..schema.num_columns())
.map(|i| schema.column(i).name().to_string())
.collect();
assert!(column_names.iter().any(|c| c == "id"));
assert!(column_names.iter().any(|c| c == "record_type"));
assert!(column_names.iter().any(|c| c == "title"));
assert!(column_names.iter().any(|c| c == "doi"));
assert!(column_names.iter().any(|c| c == "first_author_name"));
}
#[test]
fn test_write_parquet_chunked_uses_multiple_row_groups_in_one_file() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data(), sample_data()];
let bytes = write_parquet_chunked(&list, 1).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.clone())).unwrap();
assert_eq!(reader.num_row_groups(), 3);
assert_eq!(reader.metadata().file_metadata().num_rows(), 3);
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 3);
}
#[test]
fn test_write_read_parquet_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], list[0]);
}
#[test]
fn test_write_read_parquet_roundtrip_preserves_fields_outside_flattened_view() {
use crate::data::{Affiliation, Description, Subject, Title};
let mut data = sample_data();
data.additional_titles.push(Title {
title: "An Alternative Title".to_string(),
type_: "TranslatedTitle".to_string(),
..Default::default()
});
data.contributors.push(Contributor::person(
Person {
given_name: "John".to_string(),
family_name: "Smith".to_string(),
affiliations: vec![Affiliation {
id: "https://ror.org/02catss52".to_string(),
name: "Example University".to_string(),
..Default::default()
}],
..Default::default()
},
Vec::new(),
));
data.identifiers.push(Identifier {
identifier: "1234-5678".to_string(),
identifier_type: "ISSN".to_string(),
..Default::default()
});
data.additional_descriptions.push(Description {
description: "A second description".to_string(),
type_: "TechnicalInfo".to_string(),
..Default::default()
});
data.subjects = vec![
Subject {
subject: "Biology".to_string(),
..Default::default()
},
Subject {
subject: "Chemistry".to_string(),
..Default::default()
},
];
let bytes = write_parquet_all(&[data.clone()]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], data);
assert_eq!(roundtripped[0].additional_titles.len(), 1);
assert_eq!(roundtripped[0].contributors.len(), 2);
assert_eq!(
roundtripped[0].contributors[1].affiliations()[0].name,
"Example University"
);
assert_eq!(roundtripped[0].identifiers.len(), 2);
assert_eq!(roundtripped[0].additional_descriptions.len(), 1);
assert_eq!(roundtripped[0].subjects.len(), 2);
}
#[test]
fn test_read_parquet_all_empty() {
let bytes = write_parquet_all(&[]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert!(roundtripped.is_empty());
}
#[test]
fn test_write_sqlite_creates_works_table() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let list = vec![sample_data()];
write_sqlite(&list, &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(rowid) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
let (id, title, type_): (String, String, String) = conn.query_row(
r#"SELECT "id", "title", "type" FROM works"#, [],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
).unwrap();
assert_eq!(id, "https://doi.org/10.1234/abc");
assert_eq!(title, "A Sample Title");
assert_eq!(type_, "JournalArticle");
let blob: Vec<u8> = conn.query_row("SELECT metadata FROM works", [], |r| r.get(0)).unwrap();
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob)).unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&decompressed).unwrap();
let contributors = parsed["contributors"].as_array().unwrap();
assert_eq!(contributors.len(), 1);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_roundtrip_provider() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_sv");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let provider: String = conn.query_row("SELECT provider FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(provider, sample_data().provider);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_read_sqlite_by_dois_batch() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_dois");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let mut a = sample_data();
a.id = "https://doi.org/10.1234/a".to_string();
a.title = "Work A".to_string();
let mut b = sample_data();
b.id = "https://doi.org/10.1234/b".to_string();
b.title = "Work B".to_string();
let mut c = sample_data();
c.id = "https://doi.org/10.1234/c".to_string();
c.title = "Work C".to_string();
write_sqlite(&[a, b, c], &path).unwrap();
let dois = vec![
"10.1234/a".to_string(),
"https://doi.org/10.1234/c".to_string(),
];
let results = read_sqlite_by_dois(&dois, &path).unwrap();
assert_eq!(results.len(), 2);
let ids: Vec<&str> = results.iter().map(|d| d.id.as_str()).collect();
assert!(ids.contains(&"https://doi.org/10.1234/a"));
assert!(ids.contains(&"https://doi.org/10.1234/c"));
let missing = read_sqlite_by_dois(&["10.9999/x".to_string()], &path).unwrap();
assert!(missing.is_empty());
let empty = read_sqlite_by_dois(&[], &path).unwrap();
assert!(empty.is_empty());
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_replaces_existing_file() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_replace");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(rowid) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_creates_works_fts_virtual_table() {
let dir = std::env::temp_dir().join("commonmeta_fts_ddl_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let exists: i64 = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='works_fts'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(exists, 1, "works_fts virtual table should be created by SQLITE_DDL");
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_rebuild_works_fts_populates_index() {
let dir = std::env::temp_dir().join("commonmeta_fts_rebuild_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let mut data = sample_data();
data.title = "Quantum entanglement in photonic systems".to_string();
write_sqlite(&[data], &path).unwrap();
rebuild_works_fts(&path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let hit: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'entanglement'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(hit, 1, "FTS5 should find the record by title keyword");
let miss: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'blockchain'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(miss, 0, "FTS5 should return no hits for absent term");
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_works_fts_matches_subjects() {
let dir = std::env::temp_dir().join("commonmeta_fts_subjects_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let mut data = sample_data();
data.title = "Some Paper".to_string();
data.subjects = vec![
crate::data::Subject { subject: "Machine Learning".to_string(), ..Default::default() },
crate::data::Subject { subject: "Neural Networks".to_string(), ..Default::default() },
];
write_sqlite(&[data], &path).unwrap();
rebuild_works_fts(&path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let hit: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'neural'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(hit, 1, "FTS5 should match on subjects column");
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_works_fts_diacritic_folding() {
let dir = std::env::temp_dir().join("commonmeta_fts_diacritic_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let mut data = sample_data();
data.title = "Über die Müller-Effekte in der Physik".to_string();
write_sqlite(&[data], &path).unwrap();
rebuild_works_fts(&path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let hit_umlaut: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'muller'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(hit_umlaut, 1, "diacritic-stripped query 'muller' should match 'Müller'");
let hit_accent: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'uber'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(hit_accent, 1, "diacritic-stripped query 'uber' should match 'Über'");
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_rebuild_works_fts_is_idempotent() {
let dir = std::env::temp_dir().join("commonmeta_fts_idempotent_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let mut data = sample_data();
data.title = "Idempotency in distributed systems".to_string();
write_sqlite(&[data], &path).unwrap();
rebuild_works_fts(&path).unwrap();
rebuild_works_fts(&path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let hit: i64 = conn.query_row(
"SELECT COUNT(*) FROM works_fts WHERE works_fts MATCH 'idempotency'",
[],
|r| r.get(0),
).unwrap();
assert_eq!(hit, 1, "double rebuild should leave exactly one hit");
}
std::fs::remove_dir_all(&dir).ok();
}
}