use std::path::Path;
use serde::Serialize;
use serde_json::Value;
use url::Url;
use fluent_uri::Uri as FUri;
use crate::data::Data;
use crate::error::{sqlite_err, Error, Result};
use crate::schema_utils::json_schema_errors;
use crate::utils::{normalize_id, normalize_ror};
const COMMONMETA_V1_SCHEMA_URL: &str = "https://commonmeta.org/commonmeta_v1.0.json";
pub fn read(json: &str) -> Result<Data> {
let value: Value = serde_json::from_str(json).map_err(|e| Error::Parse(e.to_string()))?;
if !looks_like_v1(&value) {
return Err(Error::Parse(
"commonmeta input is not schema v1.0 shaped".to_string(),
));
}
serde_json::from_value(value).map_err(|e| Error::Parse(e.to_string()))
}
fn prepare(data: &Data) -> Data {
let mut out = data.clone();
out.schema_version = COMMONMETA_V1_SCHEMA_URL.to_string();
if !out.id.is_empty() {
out.id = normalize_id(&out.id);
}
if out.type_.is_empty() {
out.type_ = "Other".to_string();
}
for c in &mut out.contributors {
if let Some(p) = c.person.as_mut() {
p.affiliations.retain(|a| !a.id.is_empty() || !a.name.is_empty());
}
if let Some(org) = &c.organization {
if org.id.is_empty() && org.name.is_empty() {
c.organization = None;
}
}
}
out.contributors.retain(|c| match c.type_.as_str() {
"Person" => c.person.is_some(),
"Organization" => c.organization.is_some(),
_ => true,
});
out.identifiers.retain(|i| i.identifier != out.id);
for r in &mut out.references {
r.publisher.clear();
r.publication_year.clear();
r.volume.clear();
r.issue.clear();
r.first_page.clear();
r.last_page.clear();
r.unstructured.clear();
}
if !out.publisher.id.is_empty() && normalize_ror(&out.publisher.id).is_empty() {
out.publisher.id.clear();
}
for c in &mut out.contributors {
if let Some(p) = &mut c.person {
for aff in &mut p.affiliations {
if !aff.id.is_empty() && normalize_ror(&aff.id).is_empty() {
aff.id.clear();
}
}
}
if let Some(org) = &mut c.organization {
if !org.id.is_empty() && normalize_ror(&org.id).is_empty() {
org.id.clear();
}
}
}
for r in &mut out.references {
if !r.id.is_empty() {
let normalized = normalize_id(&r.id);
r.id = if normalized.is_empty() {
String::new()
} else {
let candidate = match Url::parse(&normalized) {
Ok(u) => {
let s = u.to_string();
if s.ends_with('/') { s[..s.len() - 1].to_string() } else { s }
}
Err(_) => String::new(),
};
if candidate.is_empty() || FUri::parse(candidate.as_str()).is_err() {
String::new()
} else {
candidate
}
};
}
}
out.references.retain(|r| {
!r.id.is_empty() || !r.key.is_empty() || !r.reference.is_empty() || !r.title.is_empty()
});
if !out.license.url.is_empty() && FUri::parse(out.license.url.as_str()).is_err() {
out.license.url = String::new();
}
out.files.retain(|f| !f.url.is_empty() && FUri::parse(f.url.as_str()).is_ok());
if !out.url.is_empty() && FUri::parse(out.url.as_str()).is_err() {
out.url = String::new();
}
{
let mut seen = std::collections::HashSet::new();
out.geo_locations.retain(|g| {
let key = serde_json::to_string(g).unwrap_or_default();
seen.insert(key)
});
}
for id in &mut out.identifiers {
let known = matches!(id.identifier_type.as_str(),
"ARK" | "arXiv" | "Bibcode" | "DOI" | "Handle" | "ISBN" | "ISSN"
| "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
if !known {
if id.scheme.is_empty() {
id.scheme = std::mem::take(&mut id.identifier_type);
}
id.identifier_type = "Other".to_string();
}
}
{
let raw = std::mem::take(&mut out.container.identifier_type);
let known = matches!(raw.as_str(),
"ARK" | "arXiv" | "Bibcode" | "DOI" | "Handle" | "ISBN" | "ISSN"
| "OpenAlex" | "PMID" | "PMCID" | "PURL" | "RAiD" | "SWHID"
| "URL" | "URN" | "UUID" | "GUID" | "Other"
);
match raw.as_str() {
"EISSN" | "PISSN" => out.container.identifier_type = "ISSN".to_string(),
_ if known => out.container.identifier_type = raw,
_ if !raw.is_empty() => {
if out.container.scheme.is_empty() {
out.container.scheme = raw;
}
out.container.identifier_type = "Other".to_string();
}
_ => {}
}
}
out.additional_titles.retain(|t| !t.title.is_empty());
for f in &mut out.funding_references {
if !f.funder_id.is_empty() && FUri::parse(f.funder_id.as_str()).is_err() {
f.funder_id = String::new();
}
}
out.funding_references.retain(|f| {
!f.funder_name.is_empty()
|| !f.funder_id.is_empty()
|| !f.award_number.is_empty()
|| !f.award_title.is_empty()
});
for rel in &mut out.relations {
let normalized = normalize_id(&rel.id);
rel.id = normalized;
}
out.relations.retain(|r| !r.id.is_empty());
out
}
pub fn write(data: &Data) -> Result<Vec<u8>> {
let out = prepare(data);
let bytes = serde_json::to_vec(&out).map_err(|e| Error::Serialize(e.to_string()))?;
json_schema_errors(&bytes, Some("commonmeta"))?;
Ok(bytes)
}
pub fn write_all(list: &[Data]) -> Result<Vec<u8>> {
let prepared: Vec<Data> = list.iter().map(prepare).collect();
let bytes =
serde_json::to_vec_pretty(&prepared).map_err(|e| Error::Serialize(e.to_string()))?;
json_schema_errors(&bytes, Some("commonmeta"))?;
Ok(bytes)
}
fn looks_like_v1(value: &Value) -> bool {
let Some(obj) = value.as_object() else {
return false;
};
obj.get("schema_version").and_then(Value::as_str) == Some(COMMONMETA_V1_SCHEMA_URL)
|| obj.contains_key("date_published")
|| obj.contains_key("additional_titles")
|| obj.contains_key("additional_descriptions")
|| obj
.get("identifiers")
.and_then(Value::as_array)
.and_then(|ids| ids.first())
.and_then(Value::as_object)
.is_some_and(|id_obj| id_obj.contains_key("identifier_type"))
|| obj
.get("contributors")
.and_then(Value::as_array)
.and_then(|contributors| contributors.first())
.and_then(Value::as_object)
.is_some_and(|contributor| {
contributor.contains_key("person") || contributor.contains_key("organization")
})
}
#[derive(
Debug,
Default,
Clone,
Serialize,
parquet_derive::ParquetRecordWriter,
parquet_derive::ParquetRecordReader,
)]
pub struct CommonmetaRow {
pub id: String,
pub record_type: String,
pub title: String,
pub url: String,
pub doi: String,
pub publisher: String,
pub language: String,
pub version: String,
pub license: String,
pub container_title: String,
pub container_type: String,
pub volume: String,
pub issue: String,
pub first_page: String,
pub last_page: String,
pub date_published: String,
pub date_created: String,
pub date_updated: String,
pub contributor_count: i32,
pub first_author_name: String,
pub first_author_orcid: String,
pub subjects: String,
pub description: String,
pub provider: String,
pub additional_type: String,
pub json: String,
}
fn flatten_row(data: &Data) -> CommonmetaRow {
let doi = data
.identifiers
.iter()
.find(|i| i.identifier_type == "DOI")
.map(|i| i.identifier.clone())
.unwrap_or_else(|| {
if data.id.contains("doi.org") {
data.id.clone()
} else {
String::new()
}
});
let (first_author_name, first_author_orcid) = data
.contributors
.first()
.map(|c| (c.name(), c.id().to_string()))
.unwrap_or_default();
let subjects = data
.subjects
.iter()
.map(|s| s.subject.as_str())
.collect::<Vec<_>>()
.join("; ");
let json = serde_json::to_string(data).unwrap_or_default();
CommonmetaRow {
id: data.id.clone(),
record_type: data.type_.clone(),
title: data.title.clone(),
url: data.url.clone(),
doi,
publisher: data.publisher.name.clone(),
language: data.language.clone(),
version: data.version.clone(),
license: data.license.id.clone(),
container_title: data.container.title.clone(),
container_type: data.container.type_.clone(),
volume: data.container.volume.clone(),
issue: data.container.issue.clone(),
first_page: data.container.first_page.clone(),
last_page: data.container.last_page.clone(),
date_published: data.date_published.clone(),
date_created: data.dates.created.clone(),
date_updated: data.date_updated.clone(),
contributor_count: data.contributors.len() as i32,
first_author_name,
first_author_orcid,
subjects,
description: data.description.clone(),
provider: data.provider.clone(),
additional_type: data.additional_type.clone(),
json,
}
}
const ROW_GROUP_SIZE: usize = 100_000;
pub fn write_parquet_all(list: &[Data]) -> Result<Vec<u8>> {
write_parquet_chunked(list, ROW_GROUP_SIZE)
}
fn write_parquet_chunked(list: &[Data], row_group_size: usize) -> Result<Vec<u8>> {
use parquet::file::properties::WriterProperties;
use parquet::file::writer::SerializedFileWriter;
use parquet::record::RecordWriter;
let chunks: Vec<&[Data]> = if list.is_empty() {
vec![&[][..]]
} else {
list.chunks(row_group_size).collect()
};
let row_chunks: Vec<Vec<CommonmetaRow>> = std::thread::scope(|scope| {
let handles: Vec<_> = chunks
.into_iter()
.map(|chunk| scope.spawn(move || chunk.iter().map(flatten_row).collect::<Vec<_>>()))
.collect();
handles
.into_iter()
.map(|h| {
h.join()
.map_err(|_| Error::Serialize("parquet flatten thread panicked".to_string()))
})
.collect::<Result<Vec<_>>>()
})?;
let schema = row_chunks[0]
.as_slice()
.schema()
.map_err(|e| Error::Serialize(e.to_string()))?;
let props = std::sync::Arc::new(WriterProperties::builder().build());
let buffer: Vec<u8> = Vec::new();
let mut writer = SerializedFileWriter::new(buffer, schema, props)
.map_err(|e| Error::Serialize(e.to_string()))?;
for rows in &row_chunks {
let mut row_group = writer
.next_row_group()
.map_err(|e| Error::Serialize(e.to_string()))?;
rows.as_slice()
.write_to_row_group(&mut row_group)
.map_err(|e| Error::Serialize(e.to_string()))?;
row_group
.close()
.map_err(|e| Error::Serialize(e.to_string()))?;
}
writer
.into_inner()
.map_err(|e| Error::Serialize(e.to_string()))
}
fn unflatten_row(row: &CommonmetaRow) -> Data {
if !row.json.is_empty()
&& let Ok(data) = serde_json::from_str::<Data>(&row.json)
{
return data;
}
unflatten_row_lossy(row)
}
fn unflatten_row_lossy(row: &CommonmetaRow) -> Data {
Data {
id: row.id.clone(),
type_: row.record_type.clone(),
additional_type: row.additional_type.clone(),
title: row.title.clone(),
url: row.url.clone(),
identifiers: if row.doi.is_empty() {
Vec::new()
} else {
vec![crate::data::Identifier {
identifier: row.doi.clone(),
identifier_type: "DOI".to_string(),
..Default::default()
}]
},
publisher: crate::data::Publisher {
name: row.publisher.clone(),
..Default::default()
},
language: row.language.clone(),
version: row.version.clone(),
license: crate::data::License {
id: row.license.clone(),
..Default::default()
},
container: crate::data::Container {
title: row.container_title.clone(),
type_: row.container_type.clone(),
volume: row.volume.clone(),
issue: row.issue.clone(),
first_page: row.first_page.clone(),
last_page: row.last_page.clone(),
..Default::default()
},
date_published: row.date_published.clone(),
date_updated: row.date_updated.clone(),
dates: crate::data::Dates {
created: row.date_created.clone(),
..Default::default()
},
contributors: if row.first_author_name.is_empty() && row.first_author_orcid.is_empty() {
Vec::new()
} else {
vec![crate::data::Contributor::person(
crate::data::Person {
id: row.first_author_orcid.clone(),
..Default::default()
},
Vec::new(),
)]
},
subjects: row
.subjects
.split("; ")
.filter(|s| !s.is_empty())
.map(|s| crate::data::Subject {
subject: s.to_string(),
..Default::default()
})
.collect(),
description: row.description.clone(),
provider: row.provider.clone(),
..Default::default()
}
}
const SQLITE_DDL: &str = r#"PRAGMA synchronous=NORMAL;
CREATE TABLE IF NOT EXISTS settings (
"key" TEXT PRIMARY KEY NOT NULL,
"value" TEXT NOT NULL DEFAULT ''
);
CREATE TABLE IF NOT EXISTS works (
"id" TEXT PRIMARY KEY NOT NULL,
"type" TEXT NOT NULL DEFAULT '',
"url" TEXT NOT NULL DEFAULT '',
"title" TEXT NOT NULL DEFAULT '',
"subjects" TEXT NOT NULL DEFAULT '[]',
"language" TEXT NOT NULL DEFAULT '',
"date_published" TEXT NOT NULL DEFAULT '',
"date_updated" TEXT NOT NULL DEFAULT '',
"provider" TEXT NOT NULL DEFAULT '',
"valid" INTEGER NOT NULL DEFAULT 0,
"metadata" BLOB NOT NULL DEFAULT x''
);
CREATE INDEX IF NOT EXISTS works_type ON works("type");
CREATE INDEX IF NOT EXISTS works_date_published ON works("date_published");
CREATE INDEX IF NOT EXISTS works_date_updated ON works("date_updated");
CREATE INDEX IF NOT EXISTS works_provider ON works("provider");"#;
const SQLITE_INSERT: &str = r#"INSERT OR REPLACE INTO works (
"id", "type", "url", "title", "subjects",
"language", "date_published", "date_updated", "provider", "valid", "metadata"
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"#;
pub struct PreparedRow {
pub id: String,
pub type_: String,
pub url: String,
pub title: String,
pub subjects: String,
pub language: String,
pub date_published: String,
pub date_updated: String,
pub provider: String,
pub valid: bool,
pub metadata: Vec<u8>,
}
pub fn serialize_to_row(data: Data) -> PreparedRow {
let data = prepare(&data);
let subjects = serde_json::to_string(&data.subjects).unwrap_or_default();
let json = serde_json::to_string(&data).unwrap_or_default();
let metadata = zstd::encode_all(json.as_bytes(), 0).unwrap_or_else(|_| json.into_bytes());
PreparedRow {
id: data.id,
type_: data.type_,
url: data.url,
title: data.title,
subjects,
language: data.language,
date_published: data.date_published,
date_updated: data.date_updated,
provider: data.provider,
valid: false,
metadata,
}
}
pub(crate) fn init_sqlite_writer(path: &Path, overwrite: bool) -> Result<rusqlite::Connection> {
if overwrite && path.exists() {
std::fs::remove_file(path)
.map_err(|e| Error::Parse(format!("failed to remove '{}': {}", path.display(), e)))?;
}
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open sqlite '{}': {}", path.display(), e)))?;
let _: String = conn.query_row("PRAGMA journal_mode=WAL", [], |r| r.get(0))
.map_err(|e| Error::Parse(format!("failed to set WAL mode: {}", e)))?;
conn.execute_batch(SQLITE_DDL)
.map_err(|e| Error::Parse(format!("failed to create works table: {}", e)))?;
Ok(conn)
}
pub(crate) fn write_sqlite_batch_rows(
conn: &rusqlite::Connection,
rows: Vec<PreparedRow>,
) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let tx = conn
.unchecked_transaction()
.map_err(|e| sqlite_err(e, "failed to begin transaction"))?;
{
let mut stmt = tx
.prepare(SQLITE_INSERT)
.map_err(|e| sqlite_err(e, "failed to prepare insert"))?;
for row in &rows {
let id_for_err = row.id.clone();
stmt.execute(rusqlite::params![
row.id, row.type_, row.url, row.title, row.subjects,
row.language, row.date_published, row.date_updated, row.provider,
row.valid as i32, row.metadata,
])
.map_err(|e| sqlite_err(e, &format!("failed to insert '{}'", id_for_err)))?;
}
}
tx.commit()
.map_err(|e| sqlite_err(e, "failed to commit transaction"))?;
let _ = conn.execute("PRAGMA wal_checkpoint(PASSIVE)", []);
Ok(())
}
pub fn write_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, true)
}
pub fn upsert_sqlite(data: &[Data], path: &Path) -> Result<()> {
write_sqlite_impl(data, path, false)
}
fn write_sqlite_impl(data: &[Data], path: &Path, overwrite: bool) -> Result<()> {
let rows: Vec<PreparedRow> = data.iter().map(|d| serialize_to_row(d.clone())).collect();
let conn = init_sqlite_writer(path, overwrite)?;
write_sqlite_batch_rows(&conn, rows)
}
pub fn count_sqlite_works(path: &Path) -> Result<usize> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(e.to_string()))?;
let n: i64 = conn
.query_row("SELECT COUNT(*) FROM works", [], |row| row.get(0))
.map_err(|e| Error::Parse(e.to_string()))?;
Ok(n.max(0) as usize)
}
const SQLITE_SELECT: &str = r#"SELECT "metadata" FROM works ORDER BY rowid"#;
fn read_sqlite_rows(
conn: &rusqlite::Connection,
limit: Option<usize>,
offset: usize,
) -> Result<Vec<Data>> {
let sql = match (limit, offset) {
(Some(n), o) => format!("{} LIMIT {} OFFSET {}", SQLITE_SELECT, n, o),
(None, o) if o > 0 => format!("{} LIMIT -1 OFFSET {}", SQLITE_SELECT, o),
_ => SQLITE_SELECT.to_string(),
};
let mut stmt = conn.prepare(&sql).map_err(|e| Error::Parse(e.to_string()))?;
let mut rows = stmt.query([]).map_err(|e| Error::Parse(e.to_string()))?;
let mut results = Vec::new();
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let blob: Vec<u8> = row
.get(0)
.map_err(|e| Error::Parse(format!("failed to read metadata blob: {}", e)))?;
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
results.push(data);
}
Ok(results)
}
pub fn read_sqlite_commonmeta(path: &Path, limit: Option<usize>, offset: usize) -> Result<Vec<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
read_sqlite_rows(&conn, limit, offset)
}
pub fn read_sqlite_by_id(id: &str, path: &Path) -> Result<Option<Data>> {
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let result = conn.query_row(
r#"SELECT "metadata" FROM works WHERE id = ?1 LIMIT 1"#,
rusqlite::params![id],
|row| row.get::<_, Vec<u8>>(0),
);
match result {
Ok(blob) => {
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob))
.map_err(|e| Error::Parse(format!("failed to decompress metadata: {}", e)))?;
let data: Data = serde_json::from_slice(&decompressed)
.map_err(|e| Error::Parse(format!("failed to deserialize metadata: {}", e)))?;
Ok(Some(data))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(Error::Parse(e.to_string())),
}
}
use crate::schema_utils::collect_leaf_errors;
pub struct ValidationError {
pub id: String,
pub errors: Vec<String>,
}
pub struct ValidationReport {
pub total: usize,
pub valid: usize,
pub invalid: usize,
pub fixed: usize,
pub errors: Vec<ValidationError>,
}
pub fn validate_sqlite(
path: &Path,
provider: Option<&str>,
work_type: Option<&str>,
limit: usize,
fix: bool,
recheck: bool,
) -> Result<ValidationReport> {
use serde_json::Value;
use crate::schema_utils::SCHEMA_JSON;
let schema_json: Value = serde_json::from_str(SCHEMA_JSON)
.map_err(|e| Error::Parse(format!("failed to parse commonmeta schema: {e}")))?;
let validation_schema = {
let mut merged = serde_json::Map::new();
if let Some(v) = schema_json.get("$schema") { merged.insert("$schema".to_string(), v.clone()); }
if let Some(v) = schema_json.get("$id") { merged.insert("$id".to_string(), v.clone()); }
if let Some(v) = schema_json.get("definitions") { merged.insert("definitions".to_string(), v.clone()); }
if let Some(Value::Object(cm)) = schema_json.get("commonmeta") {
for (k, v) in cm { merged.insert(k.clone(), v.clone()); }
}
Value::Object(merged)
};
let compiled = jsonschema::validator_for(&validation_schema)
.map_err(|e| Error::Parse(format!("failed to compile commonmeta schema: {e}")))?;
let conn = rusqlite::Connection::open(path)
.map_err(|e| Error::Parse(format!("failed to open '{}': {}", path.display(), e)))?;
let _ = conn.execute_batch("PRAGMA cache_size=-65536; PRAGMA mmap_size=4294967296;");
conn.execute_batch(
r#"CREATE TABLE IF NOT EXISTS validation_errors (
id TEXT PRIMARY KEY,
errors TEXT NOT NULL,
checked_at TEXT NOT NULL
);"#,
).map_err(|e| Error::Parse(format!("failed to create validation_errors table: {e}")))?;
let _ = conn.execute_batch(r#"ALTER TABLE works ADD COLUMN "valid" INTEGER NOT NULL DEFAULT 0;"#);
if fix || recheck {
let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
}
let mut where_parts = Vec::new();
if recheck { where_parts.push(r#"works."valid" = 0"#); }
if provider.is_some() { where_parts.push(r#"works."provider" = ?1"#); }
if work_type.is_some() { where_parts.push(r#"works."type" = ?2"#); }
let where_sql = if where_parts.is_empty() {
String::new()
} else {
format!("WHERE {}", where_parts.join(" AND "))
};
let count_sql = format!(r#"SELECT COUNT(*) FROM works {where_sql}"#);
let cursor_sql = format!(
r#"SELECT works.id, works.metadata FROM works {where_sql}
ORDER BY works.rowid LIMIT ?3 OFFSET ?4"#
);
let provider_param = provider.unwrap_or("");
let type_param = work_type.unwrap_or("");
let row_count: u64 = conn
.query_row(&count_sql, rusqlite::params![provider_param, type_param], |r| r.get::<_, i64>(0))
.unwrap_or(0).max(0) as u64;
let total_to_check = if limit == 0 { row_count } else { row_count.min(limit as u64) };
let bar = crate::progress::count_bar("validating", total_to_check);
let mut stmt = conn.prepare(&cursor_sql)
.map_err(|e| Error::Parse(e.to_string()))?;
const BATCH: usize = 10_000;
let mut valid = 0usize;
let mut fixed = 0usize;
let mut report_errors: Vec<ValidationError> = Vec::new();
let mut offset = 0usize;
let mut total = 0usize;
let upsert_error_sql = r#"INSERT INTO validation_errors (id, errors, checked_at)
VALUES (?1, ?2, datetime('now'))
ON CONFLICT(id) DO UPDATE SET errors = excluded.errors, checked_at = excluded.checked_at"#;
loop {
let remaining = if limit == 0 { BATCH } else { limit.saturating_sub(total) };
if remaining == 0 { break; }
let batch_size = BATCH.min(remaining);
let batch_offset = if recheck { 0 } else { offset };
let raw_batch: Vec<(String, Vec<u8>)> = {
let mut rows = stmt
.query(rusqlite::params![provider_param, type_param, batch_size as i64, batch_offset as i64])
.map_err(|e| Error::Parse(e.to_string()))?;
let mut v = Vec::with_capacity(batch_size);
while let Some(row) = rows.next().map_err(|e| Error::Parse(e.to_string()))? {
let id: String = row.get(0).map_err(|e| Error::Parse(e.to_string()))?;
let blob: Vec<u8> = row.get(1).map_err(|e| Error::Parse(e.to_string()))?;
v.push((id, blob));
}
v
};
if raw_batch.is_empty() { break; }
let batch_count = raw_batch.len();
let mut passing_ids: Vec<String> = Vec::new();
let mut fix_updates: Vec<(Vec<u8>, String)> = Vec::new();
let mut error_pairs: Vec<(String, Vec<String>)> = Vec::new();
for (id, blob) in &raw_batch {
let decompressed = match zstd::decode_all(std::io::Cursor::new(blob)) {
Ok(d) => d,
Err(e) => { eprintln!("validate: decompress '{}': {}", id, e); continue; }
};
let doc: Value = match serde_json::from_slice(&decompressed) {
Ok(v) => v,
Err(e) => { eprintln!("validate: parse '{}': {}", id, e); continue; }
};
let raw_errs: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&doc).collect();
let errs = collect_leaf_errors(&raw_errs);
if errs.is_empty() {
passing_ids.push(id.clone());
valid += 1;
} else if fix {
if let Ok(data) = serde_json::from_value::<Data>(doc) {
let repaired_row = serialize_to_row(data);
let repaired_doc: Value = serde_json::from_slice(
&zstd::decode_all(std::io::Cursor::new(&repaired_row.metadata))
.unwrap_or_default()
).unwrap_or(Value::Null);
let repaired_raw: Vec<jsonschema::ValidationError<'_>> =
compiled.iter_errors(&repaired_doc).collect();
let remaining_errs = collect_leaf_errors(&repaired_raw);
if remaining_errs.is_empty() {
fix_updates.push((repaired_row.metadata, id.clone()));
fixed += 1;
valid += 1;
} else {
error_pairs.push((id.clone(), remaining_errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: remaining_errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
} else {
error_pairs.push((id.clone(), errs.clone()));
report_errors.push(ValidationError { id: id.clone(), errors: errs });
}
bar.inc(1);
}
let tx = conn.unchecked_transaction()
.map_err(|e| Error::Parse(format!("begin transaction: {e}")))?;
for id in &passing_ids {
tx.execute(r#"UPDATE works SET "valid" = 1 WHERE id = ?1"#, [id]).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (meta, id) in &fix_updates {
tx.execute(
r#"UPDATE works SET "metadata" = ?1, "valid" = 1 WHERE id = ?2"#,
rusqlite::params![meta, id],
).ok();
tx.execute("DELETE FROM validation_errors WHERE id = ?1", [id]).ok();
}
for (id, errors) in &error_pairs {
let errors_json = serde_json::to_string(errors).unwrap_or_default();
tx.execute(upsert_error_sql, rusqlite::params![id, errors_json]).ok();
}
tx.commit().map_err(|e| Error::Parse(format!("commit transaction: {e}")))?;
total += batch_count;
offset += batch_count;
if batch_count < batch_size { break; }
if recheck && passing_ids.is_empty() && fix_updates.is_empty() { break; }
}
bar.finish_and_clear();
Ok(ValidationReport {
total,
valid,
invalid: report_errors.len(),
fixed,
errors: report_errors,
})
}
pub fn read_parquet_all(bytes: &[u8]) -> Result<Vec<Data>> {
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::record::RecordReader;
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.to_vec()))
.map_err(|e| Error::Parse(e.to_string()))?;
let mut rows: Vec<CommonmetaRow> = Vec::new();
for i in 0..reader.num_row_groups() {
let mut row_group_reader = reader
.get_row_group(i)
.map_err(|e| Error::Parse(e.to_string()))?;
let num_rows = row_group_reader.metadata().num_rows() as usize;
rows.read_from_row_group(&mut *row_group_reader, num_rows)
.map_err(|e| Error::Parse(e.to_string()))?;
}
Ok(rows.iter().map(unflatten_row).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::{Contributor, Identifier, Person};
fn sample_data() -> Data {
Data {
id: "https://doi.org/10.1234/abc".to_string(),
type_: "JournalArticle".to_string(),
title: "A Sample Title".to_string(),
identifiers: vec![Identifier {
identifier: "10.1234/abc".to_string(),
identifier_type: "DOI".to_string(),
..Default::default()
}],
contributors: vec![Contributor::person(
Person {
given_name: "Jane".to_string(),
family_name: "Doe".to_string(),
id: "https://orcid.org/0000-0002-1825-0097".to_string(),
..Default::default()
},
Vec::new(),
)],
..Data::default()
}
}
#[test]
fn test_flatten_row_basic() {
let row = flatten_row(&sample_data());
assert_eq!(row.id, "https://doi.org/10.1234/abc");
assert_eq!(row.record_type, "JournalArticle");
assert_eq!(row.title, "A Sample Title");
assert_eq!(row.doi, "10.1234/abc");
assert_eq!(row.first_author_name, "Jane Doe");
assert_eq!(
row.first_author_orcid,
"https://orcid.org/0000-0002-1825-0097"
);
assert_eq!(row.contributor_count, 1);
}
#[test]
fn test_flatten_row_doi_fallback_from_id() {
let mut data = sample_data();
data.identifiers.clear();
let row = flatten_row(&data);
assert_eq!(row.doi, "https://doi.org/10.1234/abc");
}
#[test]
fn test_write_parquet_all_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
assert!(!bytes.is_empty());
assert_eq!(&bytes[0..4], b"PAR1");
assert_eq!(&bytes[bytes.len() - 4..], b"PAR1");
}
#[test]
fn test_write_parquet_all_empty() {
let list: Vec<Data> = vec![];
let bytes = write_parquet_all(&list).unwrap();
assert_eq!(&bytes[0..4], b"PAR1");
}
#[test]
fn test_write_parquet_all_readable_schema_and_rows() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes)).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.file_metadata().num_rows(), 2);
let schema = metadata.file_metadata().schema_descr();
let column_names: Vec<String> = (0..schema.num_columns())
.map(|i| schema.column(i).name().to_string())
.collect();
assert!(column_names.iter().any(|c| c == "id"));
assert!(column_names.iter().any(|c| c == "record_type"));
assert!(column_names.iter().any(|c| c == "title"));
assert!(column_names.iter().any(|c| c == "doi"));
assert!(column_names.iter().any(|c| c == "first_author_name"));
}
#[test]
fn test_write_parquet_chunked_uses_multiple_row_groups_in_one_file() {
use parquet::file::reader::{FileReader, SerializedFileReader};
let list = vec![sample_data(), sample_data(), sample_data()];
let bytes = write_parquet_chunked(&list, 1).unwrap();
let reader = SerializedFileReader::new(::bytes::Bytes::from(bytes.clone())).unwrap();
assert_eq!(reader.num_row_groups(), 3);
assert_eq!(reader.metadata().file_metadata().num_rows(), 3);
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 3);
}
#[test]
fn test_write_read_parquet_roundtrip() {
let list = vec![sample_data()];
let bytes = write_parquet_all(&list).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], list[0]);
}
#[test]
fn test_write_read_parquet_roundtrip_preserves_fields_outside_flattened_view() {
use crate::data::{Affiliation, Description, Subject, Title};
let mut data = sample_data();
data.additional_titles.push(Title {
title: "An Alternative Title".to_string(),
type_: "TranslatedTitle".to_string(),
..Default::default()
});
data.contributors.push(Contributor::person(
Person {
given_name: "John".to_string(),
family_name: "Smith".to_string(),
affiliations: vec![Affiliation {
id: "https://ror.org/02catss52".to_string(),
name: "Example University".to_string(),
..Default::default()
}],
..Default::default()
},
Vec::new(),
));
data.identifiers.push(Identifier {
identifier: "1234-5678".to_string(),
identifier_type: "ISSN".to_string(),
..Default::default()
});
data.additional_descriptions.push(Description {
description: "A second description".to_string(),
type_: "TechnicalInfo".to_string(),
..Default::default()
});
data.subjects = vec![
Subject {
subject: "Biology".to_string(),
..Default::default()
},
Subject {
subject: "Chemistry".to_string(),
..Default::default()
},
];
let bytes = write_parquet_all(&[data.clone()]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert_eq!(roundtripped.len(), 1);
assert_eq!(roundtripped[0], data);
assert_eq!(roundtripped[0].additional_titles.len(), 1);
assert_eq!(roundtripped[0].contributors.len(), 2);
assert_eq!(
roundtripped[0].contributors[1].affiliations()[0].name,
"Example University"
);
assert_eq!(roundtripped[0].identifiers.len(), 2);
assert_eq!(roundtripped[0].additional_descriptions.len(), 1);
assert_eq!(roundtripped[0].subjects.len(), 2);
}
#[test]
fn test_read_parquet_all_empty() {
let bytes = write_parquet_all(&[]).unwrap();
let roundtripped = read_parquet_all(&bytes).unwrap();
assert!(roundtripped.is_empty());
}
#[test]
fn test_write_sqlite_creates_works_table() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
let list = vec![sample_data()];
write_sqlite(&list, &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
let (id, title, type_): (String, String, String) = conn.query_row(
r#"SELECT "id", "title", "type" FROM works"#, [],
|r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
).unwrap();
assert_eq!(id, "https://doi.org/10.1234/abc");
assert_eq!(title, "A Sample Title");
assert_eq!(type_, "JournalArticle");
let blob: Vec<u8> = conn.query_row("SELECT metadata FROM works", [], |r| r.get(0)).unwrap();
let decompressed = zstd::decode_all(std::io::Cursor::new(&blob)).unwrap();
let parsed: serde_json::Value = serde_json::from_slice(&decompressed).unwrap();
let contributors = parsed["contributors"].as_array().unwrap();
assert_eq!(contributors.len(), 1);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_roundtrip_provider() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_sv");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let provider: String = conn.query_row("SELECT provider FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(provider, sample_data().provider);
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_write_sqlite_replaces_existing_file() {
let dir = std::env::temp_dir().join("commonmeta_sqlite_test_replace");
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("out.sqlite3");
write_sqlite(&[sample_data()], &path).unwrap();
write_sqlite(&[sample_data()], &path).unwrap();
{
let conn = rusqlite::Connection::open(&path).unwrap();
let count: i64 = conn.query_row("SELECT COUNT(*) FROM works", [], |r| r.get(0)).unwrap();
assert_eq!(count, 1);
}
std::fs::remove_dir_all(&dir).ok();
}
}