use std::path::Path;
use serde::Serialize;
use serde_json::Value;
use url::Url;
use fluent_uri::Uri as FUri;
use crate::data::Data;
use crate::error::{sqlite_err, Error, Result};
use crate::schema_utils::json_schema_errors;
use crate::utils::{normalize_id, normalize_ror};
const COMMONMETA_V1_SCHEMA_URL: &str = "https://commonmeta.org/commonmeta_v1.0.json";
pub fn read(json: &str) -> Result<Data> {
let value: Value = serde_json::from_str(json).map_err(|e| Error::Parse(e.to_string()))?;
if !looks_like_v1(&value) {
return Err(Error::Parse(
"commonmeta input is not schema v1.0 shaped".to_string(),
));
}
serde_json::from_value(value).map_err(|e| Error::Parse(e.to_string()))
}
fn prepare(data: &Data) -> Data {
let mut out = data.clone();
out.schema_version = COMMONMETA_V1_SCHEMA_URL.to_string();
if !out.id.is_empty() {
out.id = normalize_id(&out.id);
}
if out.type_.is_empty() {
out.type_ = "Other".to_string();
}
for c in &mut out.contributors {
if let Some(p) = c.person.as_mut() {
p.affiliations.retain(|a| !a.id.is_empty() || !a.name.is_empty());
}
if let Some(org) = &c.organization {
if org.id.is_empty() && org.name.is_empty() {
c.organization = None;
}
}
}
out.contributors.retain(|c| match c.type_.as_str() {
"Person" => c.person.is_some(),
"Organization" => c.organization.is_some(),
_ => true,
});
out.identifiers.retain(|i| i.identifier != out.id);
for r in &mut out.references {
r.publisher.clear();
r.publication_year.clear();
r.volume.clear();
r.issue.clear();
r.first_page.clear();
r.last_page.clear();
r.unstructured.clear();
}
if !out.publisher.id.is_empty() && normalize_ror(&out.publisher.id).is_empty() {
out.publisher.id.clear();
}
for c in &mut out.contributors {
if let Some(p) = &mut c.person {
for aff in &mut p.affiliations {
if !aff.id.is_empty() && normalize_ror(&aff.id).is_empty() {
aff.id.clear();
}
}
}
if let Some(org) = &mut c.organization {
if !org.id.is_empty() && normalize_ror(&org.id).is_empty() {
org.id.clear();
}
}
}
for r in &mut out.references {
if !r.id.is_empty() {
let normalized = normalize_id(&r.id);
r.id = if normalized.is_empty() {
String::new()
} else {
let candidate = match Url::parse(&normalized) {
Ok(u) => {
let s = u.to_string();
if s.ends_with('/') { s[..s.len() - 1].to_string() } else { s }
}
Err(_) => String::new(),
};
if candidate.is_empty() || FUri::parse(candidate.as_str()).is_err() {
String::new()
} else {
candidate
}
};
}
}
out.references.retain(|r| {
!r.id.is_empty() || !r.key.is_empty() || !r.reference.is_empty() || !r.title.is_empty()
});
if !out.license.url.is_empty() && FUri::parse(out.license.url.as_str()).is_err() {
out.license.url = String::new();
}
out.files.retain(|f| !f.url.is_empty() && FUri::parse(f.url.as_str()).is_ok());
if !out.url.is_empty() && FUri::parse(out.url.as_str()).is_err() {
out.url = String::new();
}
{
let mut seen = std::collections::HashSet::new();
out.geo_locations.retain(|g| {
let key = serde_json::to_string(g).unwrap_or_default();
seen.insert(key)
});
}
for id in &mut out.identifiers {
let known = matches!(id.identifier_type.as_str(),
"ARK" | "arXiv" | "article_id" | "Bibcode" | "DOI" | "Handle" | "ISBN" | "ISSN"
| "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
if !known {
if id.scheme.is_empty() {
id.scheme = std::mem::take(&mut id.identifier_type);
}
id.identifier_type = "Other".to_string();
}
}
{
let raw = std::mem::take(&mut out.container.identifier_type);
let known = matches!(raw.as_str(),
"ARK" | "arXiv" | "article_id" | "Bibcode" | "DOI" | "Handle" | "ISBN" | "ISSN"
| "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
match raw.as_str() {
"EISSN" | "PISSN" => out.container.identifier_type = "ISSN".to_string(),
_ if known => out.container.identifier_type = raw,
_ if !raw.is_empty() => {
if out.container.scheme.is_empty() {
out.container.scheme = raw;
}
out.container.identifier_type = "Other".to_string();
}
_ => {}
}
}
out.additional_titles.retain(|t| !t.title.is_empty());
for f in &mut out.funding_references {
if !f.funder_id.is_empty() && FUri::parse(f.funder_id.as_str()).is_err() {
f.funder_id = String::new();
}
}
out.funding_references.retain(|f| {
!f.funder_name.is_empty()
|| !f.funder_id.is_empty()
|| !f.award_number.is_empty()
|| !f.award_title.is_empty()
});
for rel in &mut out.relations {
let normalized = normalize_id(&rel.id);
rel.id = normalized;
}
out.relations.retain(|r| !r.id.is_empty());
out
}
pub fn write(data: &Data) -> Result<Vec<u8>> {
let out = prepare(data);
let bytes = serde_json::to_vec(&out).map_err(|e| Error::Serialize(e.to_string()))?;
json_schema_errors(&bytes, Some("commonmeta"))?;
Ok(bytes)
}
pub fn write_all(list: &[Data]) -> Result<Vec<u8>> {
let prepared: Vec<Data> = list.iter().map(prepare).collect();
let bytes =
serde_json::to_vec_pretty(&prepared).map_err(|e| Error::Serialize(e.to_string()))?;
json_schema_errors(&bytes, Some("commonmeta"))?;
Ok(bytes)
}
fn looks_like_v1(value: &Value) -> bool {
let Some(obj) = value.as_object() else {
return false;
};
obj.get("schema_version").and_then(Value::as_str) == Some(COMMONMETA_V1_SCHEMA_URL)
|| obj.contains_key("date_published")
|| obj.contains_key("additional_titles")
|| obj.contains_key("additional_descriptions")
|| obj
.get("identifiers")
.and_then(Value::as_array)
.and_then(|ids| ids.first())
.and_then(Value::as_object)
.is_some_and(|id_obj| id_obj.contains_key("identifier_type"))
|| obj
.get("contributors")
.and_then(Value::as_array)
.and_then(|contributors| contributors.first())
.and_then(Value::as_object)
.is_some_and(|contributor| {
contributor.contains_key("person") || contributor.contains_key("organization")
})
}
#[derive(
Debug,
Default,
Clone,
Serialize,
parquet_derive::ParquetRecordWriter,
parquet_derive::ParquetRecordReader,
)]
pub struct CommonmetaRow {
pub id: String,
pub record_type: String,
pub title: String,
pub url: String,
pub doi: String,
pub publisher: String,
pub language: String,
pub version: String,
pub license: String,
pub container_title: String,
pub container_type: String,
pub volume: String,
pub issue: String,
pub first_page: String,
pub last_page: String,
pub date_published: String,
pub date_created: String,
pub date_updated: String,
pub contributor_count: i32,
pub first_author_name: String,
pub first_author_orcid: String,
pub subjects: String,
pub description: String,
pub provider: String,
pub additional_type: String,
pub json: String,
}
fn flatten_row(data: &Data) -> CommonmetaRow {
let doi = data
.identifiers
.iter()
.find(|i| i.identifier_type == "DOI")
.map(|i| i.identifier.clone())
.unwrap_or_else(|| {
if data.id.contains("doi.org") {
data.id.clone()
} else {
String::new()
}
});
let (first_author_name, first_author_orcid) = data
.contributors
.first()
.map(|c| (c.name(), c.id().to_string()))
.unwrap_or_default();
let subjects = data
.subjects
.iter()
.map(|s| s.subject.as_str())
.collect::<Vec<_>>()
.join("; ");
let json = serde_json::to_string(data).unwrap_or_default();
CommonmetaRow {
id: data.id.clone(),
record_type: data.type_.clone(),
title: data.title.clone(),
url: data.url.clone(),
doi,
publisher: data.publisher.name.clone(),
language: data.language.clone(),
version: data.version.clone(),
license: data.license.id.clone(),
container_title: data.container.title.clone(),
container_type: data.container.type_.clone(),
volume: data.container.volume.clone(),
issue: data.container.issue.clone(),
first_page: data.container.first_page.clone(),
last_page: data.container.last_page.clone(),
date_published: data.date_published.clone(),
date_created: data.dates.created.clone(),
date_updated: data.date_updated.clone(),
contributor_count: data.contributors.len() as i32,
first_author_name,
first_author_orcid,
subjects,
description: data.description.clone(),
provider: data.provider.clone(),
additional_type: data.additional_type.clone(),
json,
}
}
const ROW_GROUP_SIZE: usize = 100_000;
pub fn write_parquet_all(list: &[Data]) -> Result<Vec<u8>> {
write_parquet_chunked(list, ROW_GROUP_SIZE)
}
fn write_parquet_chunked(list: &[Data], row_group_size: usize) -> Result<Vec<u8>> {
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use parquet::record::RecordWriter;
let chunks: Vec<&[Data]> = if list.is_empty() {
vec![&[][..]]
} else {
list.chunks(row_group_size).collect()
};
let row_chunks: Vec<Vec<CommonmetaRow>> = std::thread::scope(|scope| {
let handles: Vec<_> = chunks
.into_iter()
.map(|chunk| scope.spawn(move || chunk.iter().map(flatten_row).collect::<Vec<_>>()))
.collect();
handles
.into_iter()
.map(|h| {
h.join()
.map_err(|_| Error::Serialize("parquet flatten thread panicked".to_string()))
})
.collect::<Result<Vec<_>>>()
})?;
let schema = row_chunks[0]
.as_slice()
.schema()
.map_err(|e| Error::Serialize(e.to_string()))?;
let props = std::sync::Arc::new(WriterProperties::builder().build());
let buffer: Vec<u8> = Vec::new();
let mut writer = SerializedFileWriter::new(buffer, schema, props)
.map_err(|e| Error::Serialize(e.to_string()))?;
for rows in &row_chunks {
let mut row_group = writer
.next_row_group()
.map_err(|e| Error::Serialize(e.to_string()))?;
rows.as_slice()
.write_to_row_group(&mut row_group)
.map_err(|e| Error::Serialize(e.to_string()))?;
row_group
.close()
.map_err(|e| Error::Serialize(e.to_string()))?;
}
writer
.into_inner()
.map_err(|e| Error::Serialize(e.to_string()))
}
fn unflatten_row(row: &CommonmetaRow) -> Data {
if !row.json.is_empty()
&& let Ok(data) = serde_json::from_str::<Data>(&row.json)
{
return data;
}
unflatten_row_lossy(row)
}
fn unflatten_row_lossy(row: &CommonmetaRow) -> Data {
Data {
id: row.id.clone(),
type_: row.record_type.clone(),
additional_type: row.additional_type.clone(),
title: row.title.clone(),
url: row.url.clone(),
identifiers: if row.doi.is_empty() {
Vec::new()
} else {
vec![crate::data::Identifier {
identifier: row.doi.clone(),
identifier_type: "DOI".to_string(),
..Default::default()
}]
},
publisher: crate::data::Publisher {
name: row.publisher.clone(),
..Default::default()
},
language: row.language.clone(),
version: row.version.clone(),
license: crate::data::License {
id: row.license.clone(),
..Default::default()
},
container: crate::data::Container {
title: row.container_title.clone(),
type_: row.container_type.clone(),
volume: row.volume.clone(),
issue: row.issue.clone(),
first_page: row.first_page.clone(),
last_page: row.last_page.clone(),
..Default::default()
},
date_published: row.date_published.clone(),
date_updated: row.date_updated.clone(),
dates: crate::data::Dates {
created: row.date_created.clone(),
..Default::default()
},
contributors: if row.first_author_name.is_empty() && row.first_author_orcid.is_empty() {
Vec::new()
} else {
vec![crate::data::Contributor::person(
crate::data::Person {
id: row.first_author_orcid.clone(),
..Default::default()
},
Vec::new(),
)]
},
subjects: row
.subjects
.split("; ")
.filter(|s| !s.is_empty())
.map(|s| crate::data::Subject {
subject: s.to_string(),
..Default::default()
})
.collect(),
description: row.description.clone(),
provider: row.provider.clone(),
..Default::default()
}
}
const SQLITE_DDL: &str = r#"PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS settings (
"key" TEXT PRIMARY KEY NOT NULL,
"value" TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"pmid" TEXT NOT NULL DEFAULT '',
"pmcid" TEXT NOT NULL DEFAULT '',
"openalex" TEXT NOT NULL DEFAULT '',
"arxiv" TEXT NOT NULL DEFAULT '',
"valid" INTEGER NOT NULL DEFAULT 0,
"metadata" BLOB NOT NULL DEFAULT x''
);
CREATE INDEX IF NOT EXISTS works_type ON works("type");
CREATE INDEX IF NOT EXISTS works_date_published ON works("date_published");
CREATE INDEX IF NOT EXISTS works_date_updated ON works("date_updated");
CREATE INDEX IF NOT EXISTS works_provider ON works("provider");
CREATE INDEX IF NOT EXISTS works_pmid ON works("pmid") WHERE "pmid" != '';
CREATE INDEX IF NOT EXISTS works_pmcid ON works("pmcid") WHERE "pmcid" != '';
CREATE INDEX IF NOT EXISTS works_openalex ON works("openalex") WHERE "openalex" != '';
CREATE INDEX IF NOT EXISTS works_arxiv ON works("arxiv") WHERE "arxiv" != '';"#;
const SCHEMA_VERSION: u32 = 2;
const SQLITE_MIGRATIONS: &[&str] = &[
r#"ALTER TABLE works ADD COLUMN "pmid" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "pmcid" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "openalex" TEXT NOT NULL DEFAULT ''"#,
r#"ALTER TABLE works ADD COLUMN "arxiv" TEXT NOT NULL DEFAULT ''"#,
r#"CREATE INDEX IF NOT EXISTS works_arxiv ON works("arxiv") WHERE "arxiv" != ''"#,
];
const SQLITE_INSERT: &str = r#"INSERT OR REPLACE INTO works (
"id", "type", "url", "title", "subjects",
"language", "date_published", "date_updated", "provider",
"pmid", "pmcid", "openalex", "arxiv", "valid", "metadata"
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)"#;
const SQLITE_UPDATE: &str = r#"UPDATE works SET
"type" = ?2, "url" = ?3, "title" = ?4, "subjects" = ?5,
"language" = ?6, "date_published" = ?7, "date_updated" = ?8, "provider" = ?9,
"pmid" = ?10, "pmcid" = ?11, "openalex" = ?12, "arxiv" = ?13, "valid" = ?14, "metadata" = ?15
WHERE "id" = ?1"#;
pub struct PreparedRow {
pub id: String,
pub type_: String,
pub url: String,
pub title: String,
pub subjects: String,
pub language: String,
pub date_published: String,
pub date_updated: String,
pub provider: String,
pub pmid: String,
pub pmcid: String,
pub openalex: String,
pub arxiv: String,
pub valid: bool,
pub metadata: Vec<u8>,
}
pub fn serialize_to_row(data: Data) -> PreparedRow {
let data = prepare(&data);
let subjects = serde_json::to_string(&data.subjects).unwrap_or_default();
let pmid = data.identifiers.iter()
.find(|i| i.identifier_type == "PMID")
.and_then(|i| crate::utils::normalize_pmid(&i.identifier, crate::utils::PmcResolver::Ncbi))
.unwrap_or_default();
let pmcid = data.identifiers.iter()
.find(|i| i.identifier_type == "PMCID")
.and_then(|i| crate::utils::normalize_pmcid(&i.identifier, crate::utils::PmcResolver::Ncbi))
.unwrap_or_default();
let openalex = data.identifiers.iter()
.find(|i| i.identifier_type == "OpenAlex")
.map(|i| i.identifier.clone())
.unwrap_or_default();
let arxiv = data.identifiers.iter()
.find(|i| i.identifier_type == "arXiv")
.and_then(|i| crate::utils::normalize_arxiv(&i.identifier))
.unwrap_or_default();
let json = serde_json::to_string(&data).unwrap_or_default();
let metadata = zstd::encode_all(json.as_bytes(), 0).unwrap_or_else(|_| json.into_bytes());
PreparedRow {
id: data.id,
type_: data.type_,
url: data.url,
title: data.title,
subjects,
language: data.language,
date_published: data.date_published,
date_updated: data.date_updated,
provider: data.provider,
pmid,
pmcid,
openalex,
arxiv,
valid: false,
metadata,
}
}
pub(crate) fn init_sqlite_writer(path: &Path, overwrite: bool) -> Result<rusqlite::Connection> {
if overwrite && path.exists() {
std::fs::remove_file(path)
.map_err(|e| Error::Parse(format!("failed to remove '{}': {}", path.display(), e)))?;
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("failed to set WAL mode: {}", e)))?;
let version: u32 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
let works_exists: bool = conn
.query_row(
"SELECT COUNT(rowid) FROM sqlite_master WHERE type='table' AND name='works'",
[],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0) > 0;
if version < SCHEMA_VERSION && works_exists {
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("ALTER TABLE")) {
if let Err(e) = conn.execute(stmt, []) {
if !e.to_string().contains("duplicate column name") {
return Err(Error::Parse(format!("schema migration: {e}")));
}
}
}
}
conn.execute_batch(SQLITE_DDL)
.map_err(|e| Error::Parse(format!("failed to create works table: {}", e)))?;
if version < SCHEMA_VERSION {
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| !s.starts_with("ALTER TABLE")) {
if let Err(e) = conn.execute(stmt, []) {
let msg = e.to_string();
if !msg.contains("duplicate column name") && !msg.contains("already exists") {
return Err(Error::Parse(format!("schema migration: {e}")));
}
}
}
conn.execute_batch(&format!("PRAGMA user_version = {SCHEMA_VERSION}"))
.map_err(|e| Error::Parse(format!("set user_version: {e}")))?;
}
Ok(conn)
}
pub fn run_migrations(path: &Path) -> Result<(usize, u32)> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("failed to set WAL mode: {}", e)))?;
conn.execute_batch("PRAGMA cache_size=-65536; PRAGMA mmap_size=4294967296;")
.map_err(|e| Error::Parse(format!("failed to set cache pragmas: {}", e)))?;
let version: u32 = conn
.query_row("PRAGMA user_version", [], |r| r.get(0))
.unwrap_or(0);
if version >= SCHEMA_VERSION {
return Ok((0, version));
}
let mut applied = 0;
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("ALTER TABLE")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", stmt);
match conn.execute(stmt, []) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("duplicate column name") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
{
let ck_start = std::time::Instant::now();
eprint!(" PRAGMA wal_checkpoint(PASSIVE) … ");
let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE)");
eprintln!("{:.1?}", ck_start.elapsed());
}
for stmt in SQLITE_MIGRATIONS.iter().filter(|s| s.starts_with("CREATE INDEX")) {
let step_start = std::time::Instant::now();
eprint!(" {} … ", stmt);
match conn.execute(stmt, []) {
Ok(_) => {
applied += 1;
eprintln!("{:.1?}", step_start.elapsed());
}
Err(e) if e.to_string().contains("already exists") => {
eprintln!("already present, skipped");
}
Err(e) => return Err(Error::Parse(format!("schema migration: {e}"))),
}
}
conn.execute_batch(&format!("PRAGMA user_version = {SCHEMA_VERSION}"))
.map_err(|e| Error::Parse(format!("set user_version: {e}")))?;
Ok((applied, SCHEMA_VERSION))
}
pub(crate) fn write_rows_in_tx(
tx: &rusqlite::Transaction,
rows: &[PreparedRow],
) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let mut stmt = tx
.prepare(SQLITE_INSERT)
.map_err(|e| sqlite_err(e, "failed to prepare insert"))?;
for row in rows {
let id_for_err = row.id.clone();
stmt.execute(rusqlite::params![
row.id, row.type_, row.url, row.title, row.subjects,
row.language, row.date_published, row.date_updated, row.provider,
row.pmid, row.pmcid, row.openalex, row.arxiv,
row.valid as i32, row.metadata,
])
.map_err(|e| sqlite_err(e, &format!("failed to insert '{}'", id_for_err)))?;
}
Ok(())
}
pub(crate) fn write_sqlite_batch_rows(
conn: &rusqlite::Connection,
rows: Vec<PreparedRow>,
) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let tx = conn
.unchecked_transaction()
.map_err(|e| sqlite_err(e, "failed to begin transaction"))?;
write_rows_in_tx(&tx, &rows)?;
tx.commit()
.map_err(|e| sqlite_err(e, "failed to commit transaction"))?;
let _ = conn.execute("PRAGMA wal_checkpoint(PASSIVE)", []);
Ok(())
}
pub fn write_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, true)
}
pub fn upsert_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, false)
}
fn write_sqlite_impl(data: &[Data], path: &Path, overwrite: bool) -> Result<()> {
let rows: Vec<PreparedRow> = data.iter().map(|d| serialize_to_row(d.clone())).collect();
let conn = init_sqlite_writer(path, overwrite)?;
write_sqlite_batch_rows(&conn, rows)
}
pub fn count_sqlite_works(path: &Path) -> Result<usize> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
let n: i64 = conn
.query_row("SELECT COUNT(rowid) FROM works", [], |row| row.get(0))
.map_err(|e| Error::Parse(e.to_string()))?;
Ok(n.max(0) as usize)
}
const SQLITE_SELECT: &str = r#"SELECT "metadata" FROM works ORDER BY rowid"#;
fn read_sqlite_rows(
conn: &rusqlite::Connection,
limit: Option<usize>,
offset: usize,
) -> Result<Vec<Data>> {
let sql = match (limit, offset) {
(Some(n), o) => format!("{} LIMIT {} OFFSET {}", SQLITE_SELECT, n, o),
(None, o) if o > 0 => format!("{} LIMIT -1 OFFSET {}", SQLITE_SELECT, o),
_ => SQLITE_SELECT.to_string(),
};
let mut stmt = conn.prepare(&sql).map_err(|e| Error::Parse(e.to_string()))?;
let mut rows = stmt.query([]).map_err(|e| Error::Parse(e.to_string()))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let blob: Vec<u8> = row
.get(0)
.map_err(|e| Error::Parse(format!("failed to read metadata blob: {}", e)))?;
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
results.push(data);
}
Ok(results)
}
pub fn read_sqlite_commonmeta(path: &Path, limit: Option<usize>, offset: usize) -> Result<Vec<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_rows(&conn, limit, offset)
}
pub(crate) enum WorksColumn {
Id,
Pmid,
Pmcid,
Openalex,
Arxiv,
}
impl WorksColumn {
fn as_col_name(&self) -> &'static str {
match self {
Self::Id => "id",
Self::Pmid => "pmid",
Self::Pmcid => "pmcid",
Self::Openalex => "openalex",
Self::Arxiv => "arxiv",
}
}
}
pub fn read_sqlite_by_id(id: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_by_column(&conn, WorksColumn::Id, id)
}
pub fn read_sqlite_by_pmid(pmid: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_pmid(pmid, crate::utils::PmcResolver::Ncbi)
.unwrap_or_else(|| pmid.to_string());
read_sqlite_by_column(&conn, WorksColumn::Pmid, &key)
}
pub fn read_sqlite_by_pmcid(pmcid: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_pmcid(pmcid, crate::utils::PmcResolver::Ncbi)
.unwrap_or_else(|| pmcid.to_string());
read_sqlite_by_column(&conn, WorksColumn::Pmcid, &key)
}
pub fn read_sqlite_by_openalex(openalex: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_by_column(&conn, WorksColumn::Openalex, openalex)
}
pub fn read_sqlite_by_arxiv(arxiv: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let key = crate::utils::normalize_arxiv(arxiv).unwrap_or_else(|| arxiv.to_string());
read_sqlite_by_column(&conn, WorksColumn::Arxiv, &key)
}
fn read_sqlite_by_column(conn: &rusqlite::Connection, col: WorksColumn, val: &str) -> Result<Option<Data>> {
let sql = format!(
r#"SELECT "metadata" FROM works WHERE "{}" = ?1 LIMIT 1"#,
col.as_col_name()
);
let result = conn.query_row(&sql, rusqlite::params![val], |row| row.get::<_, Vec<u8>>(0));
match result {
Ok(blob) => {
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
Ok(Some(data))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(Error::Parse(e.to_string())),
}
}
pub(crate) fn update_rows_in_tx(
tx: &rusqlite::Transaction,
rows: &[PreparedRow],
) -> Result<usize> {
if rows.is_empty() {
return Ok(0);
}
let mut stmt = tx
.prepare(SQLITE_UPDATE)
.map_err(|e| sqlite_err(e, "failed to prepare update"))?;
let mut count = 0usize;
for row in rows {
count += stmt
.execute(rusqlite::params![
row.id, row.type_, row.url, row.title, row.subjects,
row.language, row.date_published, row.date_updated, row.provider,
row.pmid, row.pmcid, row.openalex, row.arxiv,
row.valid as i32, row.metadata,
])
.map_err(|e| sqlite_err(e, &format!("failed to update '{}'", row.id)))?;
}
Ok(count)
}
use crate::schema_utils::collect_leaf_errors;
pub struct ValidationError {
pub id: String,
pub errors: Vec<String>,
}
pub struct ValidationReport {
pub total: usize,
pub valid: usize,
pub invalid: usize,
pub fixed: usize,
pub errors: Vec<ValidationError>,
}
pub fn validate_sqlite(
path: &Path,
provider: Option<&str>,
work_type: Option<&str>,
limit: usize,
fix: bool,
recheck: bool,
) -> Result<ValidationReport> {
use serde_json::Value;
use crate::schema_utils::SCHEMA_JSON;
let schema_json: Value = serde_json::from_str(SCHEMA_JSON)
.map_err(|e| Error::Parse(format!("failed to parse commonmeta schema: {e}")))?;
let validation_schema = {
let mut merged = serde_json::Map::new();
if let Some(v) = schema_json.get("$schema") { merged.insert("$schema".to_string(), v.clone()); }
if let Some(v) = schema_json.get("$id") { merged.insert("$id".to_string(), v.clone()); }
if let Some(v) = schema_json.get("definitions") { merged.insert("definitions".to_string(), v.clone()); }
if let Some(Value::Object(cm)) = schema_json.get("commonmeta") {
for (k, v) in cm { merged.insert(k.clone(), v.clone()); }
}
Value::Object(merged)
};
let compiled = jsonschema::validator_for(&validation_schema)
.map_err(|e| Error::Parse(format!("failed to compile commonmeta schema: {e}")))?;
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let _ = conn.execute_batch("PRAGMA cache_size=-65536; PRAGMA mmap_size=4294967296;");
conn.execute_batch(
r#"CREATE TABLE IF NOT EXISTS validation_errors (
id TEXT PRIMARY KEY,
errors TEXT NOT NULL,
checked_at TEXT NOT NULL
);"#,
).map_err(|e| Error::Parse(format!("failed to create validation_errors table: {e}")))?;
let _ = conn.execute_batch(r#"ALTER TABLE works ADD COLUMN "valid" INTEGER NOT NULL DEFAULT 0;"#);
if fix || recheck {
let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
}
let mut where_parts = Vec::new();
if recheck { where_parts.push(r#"works."valid" = 0"#); }
if provider.is_some() { where_parts.push(r#"works."provider" = ?1"#); }
if work_type.is_some() { where_parts.push(r#"works."type" = ?2"#); }
let where_sql = if where_parts.is_empty() {
String::new()
} else {
format!("WHERE {}", where_parts.join(" AND "))
};
let count_sql = format!(r#"SELECT COUNT(rowid) FROM works {where_sql}"#);
let cursor_sql = format!(
r#"SELECT works.id, works.metadata FROM works {where_sql}
ORDER BY works.rowid LIMIT ?3 OFFSET ?4"#
);
let provider_param = provider.unwrap_or("");
let type_param = work_type.unwrap_or("");
let row_count: u64 = conn
.query_row(&count_sql, rusqlite::params![provider_param, type_param], |r| r.get::<_, i64>(0))
.unwrap_or(0).max(0) as u64;
let total_to_check = if limit == 0 { row_count } else { row_count.min(limit as u64) };
let bar = crate::progress::count_bar("validating", total_to_check);
let mut stmt = conn.prepare(&cursor_sql)
.map_err(|e| Error::Parse(e.to_string()))?;
const BATCH: usize = 10_000;
let mut valid = 0usize;
let mut fixed = 0usize;
let mut report_errors: Vec<ValidationError> = Vec::new();
let mut offset = 0usize;
let mut total = 0usize;
let upsert_error_sql = r#"INSERT INTO validation_errors (id, errors, checked_at)
VALUES (?1, ?2, datetime('now'))
ON CONFLICT(id) DO UPDATE SET errors = excluded.errors, checked_at = excluded.checked_at"#;
loop {
let remaining = if limit == 0 { BATCH } else { limit.saturating_sub(total) };
if remaining == 0 { break; }
let batch_size = BATCH.min(remaining);
let batch_offset = if recheck { 0 } else { offset };
let raw_batch: Vec<(String, Vec<u8>)> = {
let mut rows = stmt
.query(rusqlite::params![provider_param, type_param, batch_size as i64, batch_offset as i64])
.map_err(|e| Error::Parse(e.to_string()))?;
let mut v = Vec::with_capacity(batch_size);
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let id: String = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let blob: Vec<u8> = row.get(1).map_err(|e| Error::Parse(e.to_string()))?;
v.push((id, blob));
}
v
};
if raw_batch.is_empty() { break; }
let batch_count = raw_batch.len();
let mut passing_ids: Vec<String> = Vec::new();
let mut fix_updates: Vec<(Vec<u8>, String)> = Vec::new();
let mut error_pairs: Vec<(String, Vec<String>)> = Vec::new();
for (id, blob) in &raw_batch {
let decompressed = match zstd::decode_all(std::io::Cursor::new(blob)) {
Ok(d) => d,
Err(e) => { eprintln!("validate: decompress '{}': {}", id, e); continue; }
};
let doc: Value = match serde_json::from_slice(&decompressed) {
Ok(v) => v,
Err(e) => { eprintln!("validate: parse '{}': {}", id, e); continue; }
};
let raw_errs: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&doc).collect();
let errs = collect_leaf_errors(&raw_errs);
if errs.is_empty() {
passing_ids.push(id.clone());
valid += 1;
} else if fix {
if let Ok(data) = serde_json::from_value::<Data>(doc) {
let repaired_row = serialize_to_row(data);
let repaired_doc: Value = serde_json::from_slice(
&zstd::decode_all(std::io::Cursor::new(&repaired_row.metadata))
.unwrap_or_default()
).unwrap_or(Value::Null);
let repaired_raw: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&repaired_doc).collect();
let remaining_errs = collect_leaf_errors(&repaired_raw);
if remaining_errs.is_empty() {
fix_updates.push((repaired_row.metadata, id.clone()));
fixed += 1;
valid += 1;
} else {
error_pairs.push((id.clone(), remaining_errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: remaining_errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
bar.inc(1);
}
let tx = conn.unchecked_transaction()
.map_err(|e| Error::Parse(format!("begin transaction: {e}")))?;
for id in &passing_ids {
tx.execute(r#"UPDATE works SET "valid" = 1 WHERE id = ?1"#, [id]).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (meta, id) in &fix_updates {
tx.execute(
r#"UPDATE works SET "metadata" = ?1, "valid" = 1 WHERE id = ?2"#,
rusqlite::params![meta, id],
).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (id, errors) in &error_pairs {
let errors_json = serde_json::to_string(errors).unwrap_or_default();
tx.execute(upsert_error_sql, rusqlite::params![id, errors_json]).ok();
}
tx.commit().map_err(|e| Error::Parse(format!("commit transaction: {e}")))?;
total += batch_count;
offset += batch_count;
if batch_count < batch_size { break; }
if recheck && passing_ids.is_empty() && fix_updates.is_empty() { break; }
}
bar.finish_and_clear();
Ok(ValidationReport {
total,
valid,
invalid: report_errors.len(),
fixed,
errors: report_errors,
})
}
pub fn read_parquet_all(bytes: &[u8]) -> Result<Vec<Data>> {
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::RecordReader;
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.to_vec()))
.map_err(|e| Error::Parse(e.to_string()))?;
let mut rows: Vec<CommonmetaRow> = Vec::new();
for i in 0..reader.num_row_groups() {
let mut row_group_reader = reader
.get_row_group(i)
.map_err(|e| Error::Parse(e.to_string()))?;
let num_rows = row_group_reader.metadata().num_rows() as usize;
rows.read_from_row_group(&mut *row_group_reader, num_rows)
.map_err(|e| Error::Parse(e.to_string()))?;
}
Ok(rows.iter().map(unflatten_row).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{Contributor, Identifier, Person};
fn sample_data() -> Data {
Data {
id: "https://doi.org/10.1234/abc".to_string(),
type_: "JournalArticle".to_string(),
title: "A Sample Title".to_string(),
identifiers: vec![Identifier {
identifier: "10.1234/abc".to_string(),
identifier_type: "DOI".to_string(),
..Default::default()
}],
contributors: vec![Contributor::person(
Person {
given_name: "Jane".to_string(),
family_name: "Doe".to_string(),
id: "https://orcid.org/0000-0002-1825-0097".to_string(),
..Default::default()
},
Vec::new(),
)],
..Data::default()
}
}
#[test]
fn test_flatten_row_basic() {
let row = flatten_row(&sample_data());
assert_eq!(row.id, "https://doi.org/10.1234/abc");
assert_eq!(row.record_type, "JournalArticle");
assert_eq!(row.title, "A Sample Title");
assert_eq!(row.doi, "10.1234/abc");
assert_eq!(row.first_author_name, "Jane Doe");
assert_eq!(
row.first_author_orcid,
"https://orcid.org/0000-0002-1825-0097"
);
assert_eq!(row.contributor_count, 1);
}
#[test]
fn test_flatten_row_doi_fallback_from_id() {
let mut data = sample_data();
data.identifiers.clear();
let row = flatten_row(&data);
assert_eq!(row.doi, "https://doi.org/10.1234/abc");
}
#[test]
fn test_write_parquet_all_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], b"PAR1");
assert_eq!(&bytes[bytes.len() - 4..], b"PAR1");
}
#[test]
fn test_write_parquet_all_empty() {
let list: Vec<Data> = vec![];
let bytes = write_parquet_all(&list).unwrap();
assert_eq!(&bytes[0..4], b"PAR1");
}
#[test]
fn test_write_parquet_all_readable_schema_and_rows() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes)).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.file_metadata().num_rows(), 2);
let schema = metadata.file_metadata().schema_descr();
let column_names: Vec<String> = (0..schema.num_columns())
.map(|i| schema.column(i).name().to_string())
.collect();
assert!(column_names.iter().any(|c| c == "id"));
assert!(column_names.iter().any(|c| c == "record_type"));
assert!(column_names.iter().any(|c| c == "title"));
assert!(column_names.iter().any(|c| c == "doi"));
assert!(column_names.iter().any(|c| c == "first_author_name"));
}
#[test]
fn test_write_parquet_chunked_uses_multiple_row_groups_in_one_file() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data(), sample_data()];
let bytes = write_parquet_chunked(&list, 1).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.clone())).unwrap();
assert_eq!(reader.num_row_groups(), 3);
assert_eq!(reader.metadata().file_metadata().num_rows(), 3);
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 3);
}
#[test]
fn test_write_read_parquet_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], list[0]);
}
#[test]
fn test_write_read_parquet_roundtrip_preserves_fields_outside_flattened_view() {
use crate::data::{Affiliation, Description, Subject, Title};
let mut data = sample_data();
data.additional_titles.push(Title {
title: "An Alternative Title".to_string(),
type_: "TranslatedTitle".to_string(),
..Default::default()
});
data.contributors.push(Contributor::person(
Person {
given_name: "John".to_string(),
family_name: "Smith".to_string(),
affiliations: vec![Affiliation {
id: "https://ror.org/02catss52".to_string(),
name: "Example University".to_string(),
..Default::default()
}],
..Default::default()
},
Vec::new(),
));
data.identifiers.push(Identifier {
identifier: "1234-5678".to_string(),
identifier_type: "ISSN".to_string(),
..Default::default()
});
data.additional_descriptions.push(Description {
description: "A second description".to_string(),
type_: "TechnicalInfo".to_string(),
..Default::default()
});
data.subjects = vec![
Subject {
subject: "Biology".to_string(),
..Default::default()
},
Subject {
subject: "Chemistry".to_string(),
..Default::default()
},
];
let bytes = write_parquet_all(&[data.clone()]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], data);
assert_eq!(roundtripped[0].additional_titles.len(), 1);
assert_eq!(roundtripped[0].contributors.len(), 2);
assert_eq!(
roundtripped[0].contributors[1].affiliations()[0].name,
"Example University"
);
assert_eq!(roundtripped[0].identifiers.len(), 2);
assert_eq!(roundtripped[0].additional_descriptions.len(), 1);
assert_eq!(roundtripped[0].subjects.len(), 2);
}
#[test]
fn test_read_parquet_all_empty() {
let bytes = write_parquet_all(&[]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert!(roundtripped.is_empty());
}
#[test]
fn test_write_sqlite_creates_works_table() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let list = vec![sample_data()];
write_sqlite(&list, &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(rowid) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
let (id, title, type_): (String, String, String) = conn.query_row(
r#"SELECT "id", "title", "type" FROM works"#, [],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
).unwrap();
assert_eq!(id, "https://doi.org/10.1234/abc");
assert_eq!(title, "A Sample Title");
assert_eq!(type_, "JournalArticle");
let blob: Vec<u8> = conn.query_row("SELECT metadata FROM works", [], |r| r.get(0)).unwrap();
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob)).unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&decompressed).unwrap();
let contributors = parsed["contributors"].as_array().unwrap();
assert_eq!(contributors.len(), 1);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_roundtrip_provider() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_sv");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let provider: String = conn.query_row("SELECT provider FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(provider, sample_data().provider);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_replaces_existing_file() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_replace");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(rowid) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
}
std::fs::remove_dir_all(&dir).ok();
}
}