use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::path::Path;
use flate2::read::GzDecoder;
use rayon::prelude::*;
use serde::Deserialize;
use crate::author_utils::{
cleanup_author, infer_contributor_type, normalize_contributor_roles, split_person_name,
};
use crate::data::{
Affiliation, Container, Contributor, Data, FundingReference, Identifier, License, Organization,
Person, Subject,
};
use crate::doi_utils::{
fetch_doi_ra, lookup_prefix_cache, normalize_doi, store_prefix_cache, validate_prefix,
};
use crate::error::{Error, Result};
use crate::formats::commonmeta::{
init_sqlite_writer, serialize_to_row, update_rows_in_tx, write_rows_in_tx, PreparedRow,
};
use crate::utils::{get_language, normalize_orcid, validate_id};
pub const PMC_IDS_URL: &str = "https://ftp.ncbi.nlm.nih.gov/pub/pmc/PMC-ids.csv.gz";
pub const PMC_IDS_CACHE_KEY: &str = "PMC-ids.csv.gz";
const BATCH_SIZE: usize = 50_000;
const COL_JOURNAL: usize = 0;
const COL_ISSN: usize = 1;
const COL_EISSN: usize = 2;
const COL_YEAR: usize = 3;
const COL_VOLUME: usize = 4;
const COL_ISSUE: usize = 5;
const COL_PAGE: usize = 6;
const COL_DOI: usize = 7;
const COL_PMCID: usize = 8;
const COL_PMID: usize = 9;
const COL_MANUSCRIPT: usize = 10;
fn get(record: &csv::StringRecord, col: usize) -> String {
record.get(col).unwrap_or("").trim().to_string()
}
fn parse_pages(page: &str) -> (String, String) {
match page.split_once('-') {
Some((first, last)) if !first.trim().is_empty() && !last.trim().is_empty() =>
(first.trim().to_string(), last.trim().to_string()),
Some((first, _)) if !first.trim().is_empty() =>
(first.trim().to_string(), String::new()),
_ => (page.trim().to_string(), String::new()),
}
}
fn row_to_data(record: &csv::StringRecord) -> Option<Data> {
let doi = get(record, COL_DOI);
if doi.is_empty() {
return None;
}
let id = normalize_doi(&doi);
if id.is_empty() {
return None;
}
let journal = get(record, COL_JOURNAL);
let issn = get(record, COL_ISSN);
let eissn = get(record, COL_EISSN);
let year = get(record, COL_YEAR);
let volume = get(record, COL_VOLUME);
let issue = get(record, COL_ISSUE);
let page = get(record, COL_PAGE);
let pmcid = get(record, COL_PMCID);
let pmid = get(record, COL_PMID);
let manuscript = get(record, COL_MANUSCRIPT);
let (first_page, last_page) = if page.is_empty() {
(String::new(), String::new())
} else {
parse_pages(&page)
};
let container = Container {
type_: if journal.is_empty() { String::new() } else { "Journal".to_string() },
title: journal,
identifier: issn.clone(),
identifier_type: if issn.is_empty() { String::new() } else { "ISSN".to_string() },
volume,
issue,
first_page,
last_page,
..Default::default()
};
let mut identifiers: Vec<Identifier> = Vec::new();
if !pmid.is_empty() {
identifiers.push(Identifier {
identifier: pmid,
identifier_type: "PMID".to_string(),
..Default::default()
});
}
if !pmcid.is_empty() {
identifiers.push(Identifier {
identifier: pmcid,
identifier_type: "PMCID".to_string(),
..Default::default()
});
}
if !eissn.is_empty() {
identifiers.push(Identifier {
identifier: eissn,
identifier_type: "Other".to_string(),
scheme: "EISSN".to_string(),
..Default::default()
});
}
if !manuscript.is_empty() {
identifiers.push(Identifier {
identifier: manuscript,
identifier_type: "article_id".to_string(),
..Default::default()
});
}
Some(Data {
id,
type_: "JournalArticle".to_string(),
date_published: year,
container,
identifiers,
provider: "PubMed".to_string(),
..Default::default()
})
}
fn is_pmc_merge_identifier(id: &Identifier) -> bool {
matches!(id.identifier_type.as_str(), "PMID" | "PMCID" | "article_id")
}
fn resolve_ra(
conn: &rusqlite::Connection,
batch_cache: &mut HashMap<String, Option<String>>,
doi: &str,
no_network: bool,
) -> Option<String> {
let prefix = validate_prefix(doi)?;
if let Some(cached) = batch_cache.get(&prefix) {
return cached.clone();
}
let ra = lookup_prefix_cache(conn, &prefix).or_else(|| {
if no_network {
return None;
}
let ra = fetch_doi_ra(&prefix)?;
store_prefix_cache(conn, &prefix, &ra);
Some(ra)
});
batch_cache.insert(prefix, ra.clone());
ra
}
fn upsert_pmc_batch(
conn: &rusqlite::Connection,
batch: &[Data],
ra_cache: &mut HashMap<String, Option<String>>,
no_network: bool,
) -> Result<usize> {
if batch.is_empty() {
return Ok(0);
}
const CHUNK: usize = 900;
let ids: Vec<&str> = batch.iter().map(|d| d.id.as_str()).collect();
let mut existing: HashMap<String, Vec<u8>> = HashMap::new();
for chunk in ids.chunks(CHUNK) {
let placeholders = chunk
.iter()
.enumerate()
.map(|(i, _)| format!("?{}", i + 1))
.collect::<Vec<_>>()
.join(", ");
let sql = format!("SELECT id, metadata FROM works WHERE id IN ({placeholders})");
let mut stmt = conn
.prepare(&sql)
.map_err(|e| Error::Parse(format!("prepare SELECT: {e}")))?;
let rows = stmt
.query_map(rusqlite::params_from_iter(chunk.iter()), |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
})
.map_err(|e| Error::Parse(format!("query existing IDs: {e}")))?;
for r in rows {
let (id, blob) = r.map_err(|e| Error::Parse(format!("fetch row: {e}")))?;
existing.insert(id, blob);
}
}
let new_data: Vec<Data> = batch
.iter()
.filter(|d| !existing.contains_key(&d.id))
.cloned()
.map(|mut d| {
if let Some(ra) = resolve_ra(conn, ra_cache, &d.id, no_network) {
d.provider = ra;
}
d
})
.collect();
let new_rows: Vec<_> = new_data.into_par_iter().map(serialize_to_row).collect();
let updated_results: Vec<Result<Option<PreparedRow>>> = batch
.par_iter()
.filter(|d| existing.contains_key(&d.id))
.map(|d| -> Result<Option<PreparedRow>> {
let new_pmc_ids: Vec<Identifier> = d
.identifiers
.iter()
.filter(|i| is_pmc_merge_identifier(i))
.cloned()
.collect();
if new_pmc_ids.is_empty() {
return Ok(None);
}
let blob = existing.get(d.id.as_str()).unwrap();
let json = zstd::decode_all(std::io::Cursor::new(blob))
.map_err(|e| Error::Parse(format!("decompress '{}': {e}", d.id)))?;
let mut record: Data = serde_json::from_slice(&json)
.map_err(|e| Error::Parse(format!("deserialize '{}': {e}", d.id)))?;
let before = record.identifiers.len();
for new_id in &new_pmc_ids {
let already_has = record
.identifiers
.iter()
.any(|i| i.identifier_type == new_id.identifier_type);
if !already_has {
record.identifiers.push(new_id.clone());
}
}
if record.identifiers.len() > before {
Ok(Some(serialize_to_row(record)))
} else {
Ok(None)
}
})
.collect();
let mut updated_rows = Vec::with_capacity(updated_results.len());
for result in updated_results {
if let Some(row) = result? {
updated_rows.push(row);
}
}
if new_rows.is_empty() && updated_rows.is_empty() {
return Ok(0);
}
let insert_count = new_rows.len();
let tx = conn
.unchecked_transaction()
.map_err(|e| Error::Parse(format!("begin transaction: {e}")))?;
write_rows_in_tx(&tx, &new_rows)?;
let update_count = update_rows_in_tx(&tx, &updated_rows)?;
tx.commit()
.map_err(|e| Error::Parse(format!("commit batch: {e}")))?;
let _ = conn.execute("PRAGMA wal_checkpoint(PASSIVE)", []);
Ok(insert_count + update_count)
}
pub fn stream_pmc_ids_to_sqlite(
gz_path: &Path,
output_path: &Path,
limit: usize,
no_network: bool,
) -> Result<usize> {
let file = File::open(gz_path)
.map_err(|e| Error::Parse(format!("open '{}': {}", gz_path.display(), e)))?;
let decoder = GzDecoder::new(BufReader::new(file));
let mut reader = csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(decoder);
let conn = init_sqlite_writer(output_path, false)?;
let mut batch: Vec<Data> = Vec::with_capacity(BATCH_SIZE);
let mut ra_cache: HashMap<String, Option<String>> = HashMap::new();
let mut written = 0usize;
for result in reader.records() {
if limit > 0 && written + batch.len() >= limit {
break;
}
let record = match result {
Ok(r) => r,
Err(e) => {
eprintln!("pubmed: CSV parse error: {e}");
continue;
}
};
if let Some(data) = row_to_data(&record) {
batch.push(data);
if batch.len() >= BATCH_SIZE {
let changed = upsert_pmc_batch(&conn, &batch, &mut ra_cache, no_network)?;
written += changed;
eprintln!("pubmed: {changed} inserted/updated in batch ({written} total)");
batch.clear();
}
}
}
if !batch.is_empty() {
let changed = upsert_pmc_batch(&conn, &batch, &mut ra_cache, no_network)?;
written += changed;
}
Ok(written)
}
const PMC_IDS_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(30 * 24 * 60 * 60);
const EPMC_API_BASE: &str = "https://www.ebi.ac.uk/europepmc/webservices/rest";
#[derive(Debug, Deserialize)]
struct EpmcArticleResponse {
result: EpmcResult,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcSearchResponse {
#[serde(rename = "hitCount", default)]
#[allow(dead_code)]
hit_count: u64,
#[serde(rename = "nextCursorMark", default)]
next_cursor_mark: String,
#[serde(rename = "resultList", default)]
result_list: EpmcResultList,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcResultList {
#[serde(default)]
result: Vec<EpmcResult>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcResult {
#[serde(default)]
pmid: String,
#[serde(default)]
pmcid: String,
#[serde(default)]
doi: String,
#[serde(default)]
title: String,
#[serde(rename = "authorList", default)]
author_list: EpmcAuthorList,
#[serde(rename = "journalInfo", default)]
journal_info: EpmcJournalInfo,
#[serde(rename = "pubYear", default)]
pub_year: String,
#[serde(rename = "pageInfo", default)]
page_info: String,
#[serde(rename = "abstractText", default)]
abstract_text: String,
#[serde(default)]
language: String,
#[serde(rename = "pubTypeList", default)]
pub_type_list: EpmcPubTypeList,
#[serde(rename = "keywordList", default)]
keyword_list: EpmcKeywordList,
#[serde(rename = "meshHeadingList", default)]
mesh_heading_list: EpmcMeshHeadingList,
#[serde(rename = "grantsList", default)]
grants_list: EpmcGrantsList,
#[serde(default)]
license: String,
#[serde(rename = "firstPublicationDate", default)]
first_publication_date: String,
#[serde(rename = "dateOfRevision", default)]
date_of_revision: String,
#[serde(rename = "arxivId", default)]
arxiv_id: String,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcAuthorList {
#[serde(default)]
author: Vec<EpmcAuthor>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcAuthor {
#[serde(rename = "firstName", default)]
first_name: String,
#[serde(rename = "lastName", default)]
last_name: String,
#[serde(rename = "fullName", default)]
full_name: String,
#[serde(default)]
affiliation: String,
#[serde(rename = "authorId", default)]
author_id: Option<EpmcAuthorId>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcAuthorId {
#[serde(rename = "type", default)]
type_: String,
#[serde(default)]
value: String,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcJournalInfo {
#[serde(default)]
issue: String,
#[serde(default)]
volume: String,
#[serde(rename = "printPublicationDate", default)]
print_publication_date: String,
#[serde(default)]
journal: EpmcJournal,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcJournal {
#[serde(default)]
title: String,
#[serde(default)]
issn: String,
#[serde(default)]
essn: String,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcPubTypeList {
#[serde(rename = "pubType", default)]
pub_type: Vec<String>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcKeywordList {
#[serde(default)]
keyword: Vec<String>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcMeshHeadingList {
#[serde(rename = "meshHeading", default)]
mesh_heading: Vec<EpmcMeshHeading>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcMeshHeading {
#[serde(rename = "descriptorName", default)]
descriptor_name: String,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcGrantsList {
#[serde(default)]
grant: Vec<EpmcGrant>,
}
#[derive(Debug, Deserialize, Default)]
struct EpmcGrant {
#[serde(rename = "grantId", default)]
grant_id: String,
#[serde(default)]
agency: String,
}
fn epmc_type_to_commonmeta(pub_types: &[String]) -> &'static str {
for t in pub_types {
match t.to_lowercase().as_str() {
"book-chapter" | "book chapter" => return "BookChapter",
"book" => return "Book",
"conference paper" | "conference-paper" => return "ProceedingsArticle",
"dataset" => return "Dataset",
"thesis" => return "Dissertation",
_ => {}
}
}
"JournalArticle"
}
fn epmc_author_to_contributor(author: EpmcAuthor) -> Contributor {
let orcid = author
.author_id
.filter(|aid| aid.type_.to_uppercase() == "ORCID" && !aid.value.is_empty())
.map(|aid| normalize_orcid(&aid.value))
.unwrap_or_default();
let (given_name, family_name) = if !author.first_name.is_empty() || !author.last_name.is_empty() {
(
cleanup_author(Some(&author.first_name))
.unwrap_or_else(|| author.first_name.clone()),
author.last_name.clone(),
)
} else {
let name = cleanup_author(Some(&author.full_name))
.unwrap_or_else(|| author.full_name.clone());
let (gn, fn_, _) = split_person_name(&name);
(gn, fn_)
};
let affiliations: Vec<Affiliation> = if author.affiliation.is_empty() {
Vec::new()
} else {
vec![Affiliation { name: author.affiliation, ..Default::default() }]
};
let roles = normalize_contributor_roles(&["Author".to_string()], "Author");
let full = format!("{} {}", given_name, family_name);
let mut type_ = infer_contributor_type("Person", &orcid, &given_name, &family_name, full.trim(), None);
if type_.is_empty() {
type_ = "Person".to_string();
}
if type_ == "Organization" {
let org_name = if !family_name.is_empty() { family_name } else { full.trim().to_string() };
Contributor::organization(
Organization { id: orcid, name: org_name, asserted_by: String::new() },
roles,
)
} else {
Contributor::person(
Person { id: orcid, given_name, family_name, affiliations, asserted_by: String::new(), ..Default::default() },
roles,
)
}
}
fn from_epmc_result(result: EpmcResult) -> Data {
let id = if !result.doi.is_empty() {
normalize_doi(&result.doi)
} else if !result.pmid.is_empty() {
format!("https://pubmed.ncbi.nlm.nih.gov/{}/", result.pmid)
} else {
String::new()
};
let type_ = epmc_type_to_commonmeta(&result.pub_type_list.pub_type).to_string();
let contributors: Vec<Contributor> = result
.author_list
.author
.into_iter()
.map(epmc_author_to_contributor)
.collect();
let date_published = if !result.first_publication_date.is_empty() {
result.first_publication_date.clone()
} else if !result.journal_info.print_publication_date.is_empty() {
result.journal_info.print_publication_date.clone()
} else {
result.pub_year.clone()
};
let date_updated = if !result.date_of_revision.is_empty()
&& result.date_of_revision != date_published
{
result.date_of_revision.clone()
} else {
String::new()
};
let journal = &result.journal_info.journal;
let (first_page, last_page) = parse_pages(&result.page_info);
let container = if !journal.title.is_empty() || !result.page_info.is_empty() {
let issn = if !journal.issn.is_empty() { journal.issn.clone() } else { journal.essn.clone() };
Container {
type_: if journal.title.is_empty() { String::new() } else { "Journal".to_string() },
title: journal.title.clone(),
identifier: issn.clone(),
identifier_type: if issn.is_empty() { String::new() } else { "ISSN".to_string() },
volume: result.journal_info.volume.clone(),
issue: result.journal_info.issue.clone(),
first_page,
last_page,
..Default::default()
}
} else {
Container::default()
};
let description = result.abstract_text;
let mut subjects: Vec<Subject> = result
.keyword_list
.keyword
.into_iter()
.filter(|k| !k.is_empty())
.map(|k| Subject { subject: k, ..Default::default() })
.collect();
subjects.extend(
result.mesh_heading_list.mesh_heading.into_iter()
.filter(|h| !h.descriptor_name.is_empty())
.map(|h| Subject { subject: h.descriptor_name, ..Default::default() }),
);
let language = get_language(&result.language, "iso639-1");
let license = if result.license.is_empty() {
License::default()
} else {
let found = crate::spdx::search(&result.license).or_else(|| {
let normalized = result.license.to_lowercase().replace(' ', "-");
crate::spdx::search(&normalized)
});
match found {
Some(l) => License {
id: l.license_id.clone(),
title: l.name.clone(),
url: l.see_also.first().cloned().unwrap_or_default(),
..Default::default()
},
None => License { url: result.license, ..Default::default() },
}
};
let mut identifiers: Vec<Identifier> = Vec::new();
if !result.pmid.is_empty() {
identifiers.push(Identifier {
identifier: result.pmid,
identifier_type: "PMID".to_string(),
..Default::default()
});
}
if !result.pmcid.is_empty() {
identifiers.push(Identifier {
identifier: result.pmcid,
identifier_type: "PMCID".to_string(),
..Default::default()
});
}
if !result.arxiv_id.is_empty() {
identifiers.push(Identifier {
identifier: format!("https://arxiv.org/abs/{}", result.arxiv_id),
identifier_type: "arXiv".to_string(),
..Default::default()
});
}
let funding_references: Vec<FundingReference> = result
.grants_list
.grant
.into_iter()
.filter(|g| !g.agency.is_empty())
.map(|g| FundingReference {
funder_name: g.agency,
award_number: g.grant_id,
..Default::default()
})
.collect();
Data {
id,
type_,
title: result.title,
contributors,
date_published,
date_updated,
container,
description,
subjects,
language,
license,
identifiers,
funding_references,
provider: "PubMed".to_string(),
..Default::default()
}
}
fn build_client() -> reqwest::Result<reqwest::blocking::Client> {
reqwest::blocking::Client::builder()
.user_agent(crate::io_utils::commonmeta_user_agent())
.build()
}
fn resolve_api_url(input: &str) -> Result<String> {
if let Some(rest) = input
.strip_prefix("https://europepmc.org/article/")
.or_else(|| input.strip_prefix("http://europepmc.org/article/"))
{
if let Some((source, id)) = rest.split_once('/') {
if matches!(source, "MED" | "PMC") && !id.is_empty() {
return Ok(format!("{EPMC_API_BASE}/article/{source}/{id}?format=json"));
}
}
}
let (id, id_type) = validate_id(input);
match id_type {
"DOI" => {
let doi = normalize_doi(&id);
let mut url = url::Url::parse(&format!("{EPMC_API_BASE}/search"))
.map_err(|e| Error::Parse(e.to_string()))?;
url.query_pairs_mut()
.append_pair("query", &format!("DOI:\"{}\"", doi))
.append_pair("resulttype", "core")
.append_pair("format", "json")
.append_pair("pageSize", "1");
Ok(url.to_string())
}
"PMID" => Ok(format!("{EPMC_API_BASE}/article/MED/{id}?format=json")),
"PMCID" => Ok(format!("{EPMC_API_BASE}/article/PMC/{id}?format=json")),
_ => Err(Error::Parse(format!(
"Cannot resolve Europe PMC API URL from '{}': expected PMID, PMCID, DOI, or URL",
input
))),
}
}
fn parse_epmc_response(text: &str) -> Result<Data> {
if let Ok(r) = serde_json::from_str::<EpmcArticleResponse>(text) {
return Ok(from_epmc_result(r.result));
}
let search: EpmcSearchResponse =
serde_json::from_str(text).map_err(|e| Error::Parse(e.to_string()))?;
search
.result_list
.result
.into_iter()
.next()
.map(from_epmc_result)
.ok_or_else(|| Error::Parse("No results found in Europe PMC response".to_string()))
}
pub fn read_json(input: &str) -> Result<Data> {
if let Ok(r) = serde_json::from_str::<EpmcArticleResponse>(input) {
return Ok(from_epmc_result(r.result));
}
if let Ok(r) = serde_json::from_str::<EpmcResult>(input) {
return Ok(from_epmc_result(r));
}
let search: EpmcSearchResponse =
serde_json::from_str(input).map_err(|e| Error::Parse(e.to_string()))?;
search
.result_list
.result
.into_iter()
.next()
.map(from_epmc_result)
.ok_or_else(|| Error::Parse("No results found in Europe PMC response".to_string()))
}
pub fn fetch(input: &str) -> Result<Data> {
let api_url = resolve_api_url(input)?;
let client = build_client().map_err(|e| Error::Http(e.to_string()))?;
let text = client
.get(&api_url)
.send()
.map_err(|e| Error::Http(e.to_string()))?
.error_for_status()
.map_err(|e| Error::Http(e.to_string()))?
.text()
.map_err(|e| Error::Http(e.to_string()))?;
parse_epmc_response(&text)
}
pub fn fetch_page(
query: &str,
page_size: usize,
cursor: &str,
) -> Result<(Vec<Data>, Option<String>)> {
let mut url = url::Url::parse(&format!("{EPMC_API_BASE}/search"))
.map_err(|e| Error::Parse(e.to_string()))?;
url.query_pairs_mut()
.append_pair("query", query)
.append_pair("resulttype", "core")
.append_pair("format", "json")
.append_pair("pageSize", &page_size.to_string())
.append_pair("cursorMark", cursor);
let client = build_client().map_err(|e| Error::Http(e.to_string()))?;
let text = client
.get(url.as_str())
.send()
.map_err(|e| Error::Http(e.to_string()))?
.error_for_status()
.map_err(|e| Error::Http(e.to_string()))?
.text()
.map_err(|e| Error::Http(e.to_string()))?;
let response: EpmcSearchResponse =
serde_json::from_str(&text).map_err(|e| Error::Parse(format!("Europe PMC search: {e}")))?;
let records: Vec<Data> = response.result_list.result.into_iter().map(from_epmc_result).collect();
let next = if records.len() < page_size || response.next_cursor_mark.is_empty() {
None
} else {
Some(response.next_cursor_mark)
};
Ok((records, next))
}
pub fn resolve_pmc_ids_path(
explicit: Option<&str>,
no_network: bool,
) -> Result<std::path::PathBuf> {
if let Some(p) = explicit {
let path = std::path::PathBuf::from(p);
if path.exists() {
return Ok(path);
}
return Err(Error::Parse(format!(
"pubmed: CSV not found at specified path: {}",
path.display()
)));
}
if let Ok(home) = std::env::var("HOME") {
let downloads = std::path::Path::new(&home)
.join("Downloads")
.join(PMC_IDS_CACHE_KEY);
if downloads.exists() {
eprintln!("pubmed: using {}", downloads.display());
return Ok(downloads);
}
}
if no_network {
return Err(Error::Parse(format!(
"pubmed: no local PMC-ids.csv.gz found and --no-network is set.\n\
Download from {PMC_IDS_URL} or place the file in ~/Downloads/"
)));
}
let (path, cached) = crate::io_utils::ensure_cached_path(
PMC_IDS_URL,
"pubmed",
PMC_IDS_CACHE_KEY,
PMC_IDS_CACHE_TTL,
)
.map_err(|e| Error::Parse(format!("pubmed: {e}")))?;
if cached {
eprintln!("pubmed: using cached file at {}", path.display());
} else {
eprintln!("pubmed: cached at {}", path.display());
}
Ok(path)
}
#[cfg(test)]
mod tests {
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
use super::*;
use crate::formats::commonmeta::{count_sqlite_works, read_sqlite_by_id};
fn make_record(fields: &[&str]) -> csv::StringRecord {
let mut r = csv::StringRecord::new();
for f in fields {
r.push_field(f);
}
r
}
fn gz(content: &str) -> Vec<u8> {
let mut enc = GzEncoder::new(Vec::new(), Compression::default());
enc.write_all(content.as_bytes()).unwrap();
enc.finish().unwrap()
}
fn write_gz_file(dir: &std::path::Path, name: &str, bytes: Vec<u8>) -> std::path::PathBuf {
let p = dir.join(name);
std::fs::write(&p, bytes).unwrap();
p
}
const HEADER: &str =
"Journal Title,ISSN,eISSN,Year,Volume,Issue,Page,DOI,PMCID,PMID,Manuscript Id,Release Date\n";
const FULL_ROW: &[&str] = &[
"Breast Cancer Res", "1465-5411", "1465-542X",
"2000", "3", "1", "55-61",
"10.1186/bcr271", "PMC13900", "11250746", "NIHMS123", "live",
];
#[test]
fn test_row_to_data_full() {
let r = make_record(FULL_ROW);
let data = row_to_data(&r).unwrap();
assert_eq!(data.id, "https://doi.org/10.1186/bcr271");
assert_eq!(data.type_, "JournalArticle");
assert_eq!(data.date_published, "2000");
assert_eq!(data.provider, "PubMed");
assert_eq!(data.container.title, "Breast Cancer Res");
assert_eq!(data.container.type_, "Journal");
assert_eq!(data.container.identifier, "1465-5411");
assert_eq!(data.container.identifier_type, "ISSN");
assert_eq!(data.container.volume, "3");
assert_eq!(data.container.issue, "1");
assert_eq!(data.container.first_page, "55");
assert_eq!(data.container.last_page, "61");
let pmid = data.identifiers.iter().find(|i| i.identifier_type == "PMID").unwrap();
assert_eq!(pmid.identifier, "11250746");
let pmcid = data.identifiers.iter().find(|i| i.identifier_type == "PMCID").unwrap();
assert_eq!(pmcid.identifier, "PMC13900");
let eissn = data.identifiers.iter().find(|i| i.scheme == "EISSN").unwrap();
assert_eq!(eissn.identifier, "1465-542X");
assert_eq!(eissn.identifier_type, "Other");
let ms = data.identifiers.iter().find(|i| i.identifier_type == "article_id").unwrap();
assert_eq!(ms.identifier, "NIHMS123");
assert!(ms.scheme.is_empty());
}
#[test]
fn test_row_to_data_single_page() {
let mut fields = FULL_ROW.to_vec();
fields[COL_PAGE] = "55";
let r = make_record(&fields);
let data = row_to_data(&r).unwrap();
assert_eq!(data.container.first_page, "55");
assert_eq!(data.container.last_page, "");
}
#[test]
fn test_row_to_data_skips_empty_doi() {
let r = make_record(&["J", "", "", "2000", "", "", "", "", "PMC1", "1", "", "live"]);
assert!(row_to_data(&r).is_none());
}
#[test]
fn test_row_to_data_omits_empty_container_fields() {
let r = make_record(&["", "", "", "", "", "", "", "10.1186/bcr271", "", "11250746", "", ""]);
let data = row_to_data(&r).unwrap();
assert!(data.container.is_empty());
assert_eq!(data.date_published, "");
}
#[test]
fn test_stream_writes_record_with_all_fields() {
let dir = std::env::temp_dir().join("commonmeta_pubmed_stream_all_fields");
std::fs::create_dir_all(&dir).unwrap();
let csv = format!(
"{HEADER}Breast Cancer Res,1465-5411,1465-542X,2000,3,1,55-61,10.1186/bcr271,PMC13900,11250746,NIHMS123,live\n"
);
let gz_path = write_gz_file(&dir, "pmc.csv.gz", gz(&csv));
let db_path = dir.join("out.sqlite3");
let n = stream_pmc_ids_to_sqlite(&gz_path, &db_path, 0, false).unwrap();
assert_eq!(n, 1);
assert_eq!(count_sqlite_works(&db_path).unwrap(), 1);
let data = read_sqlite_by_id("https://doi.org/10.1186/bcr271", &db_path)
.unwrap()
.unwrap();
assert_eq!(data.type_, "JournalArticle");
assert_eq!(data.date_published, "2000");
assert_eq!(data.container.title, "Breast Cancer Res");
assert_eq!(data.container.identifier, "1465-5411");
assert_eq!(data.container.identifier_type, "ISSN");
assert_eq!(data.container.volume, "3");
assert_eq!(data.container.issue, "1");
assert_eq!(data.container.first_page, "55");
assert_eq!(data.container.last_page, "61");
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMID" && i.identifier == "11250746"));
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMCID" && i.identifier == "PMC13900"));
assert!(data.identifiers.iter().any(|i| i.identifier_type == "Other" && i.scheme == "EISSN" && i.identifier == "1465-542X"));
assert!(data.identifiers.iter().any(|i| i.identifier_type == "article_id" && i.identifier == "NIHMS123"));
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stream_skips_rows_without_doi() {
let dir = std::env::temp_dir().join("commonmeta_pubmed_stream_skip_no_doi");
std::fs::create_dir_all(&dir).unwrap();
let csv = format!(
"{HEADER}\
J1,,,2020,,,,10.1186/aaa,,1,,live\n\
J2,,,2020,,,,,,2,,live\n\
J3,,,2020,,,,10.1186/bbb,,3,,live\n"
);
let gz_path = write_gz_file(&dir, "pmc.csv.gz", gz(&csv));
let db_path = dir.join("out.sqlite3");
let n = stream_pmc_ids_to_sqlite(&gz_path, &db_path, 0, false).unwrap();
assert_eq!(n, 2, "only rows with a DOI should be written");
assert_eq!(count_sqlite_works(&db_path).unwrap(), 2);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stream_respects_limit() {
let dir = std::env::temp_dir().join("commonmeta_pubmed_stream_limit");
std::fs::create_dir_all(&dir).unwrap();
let csv = format!(
"{HEADER}\
J,,,2020,,,,10.1186/a,,1,,live\n\
J,,,2020,,,,10.1186/b,,2,,live\n\
J,,,2020,,,,10.1186/c,,3,,live\n"
);
let gz_path = write_gz_file(&dir, "pmc.csv.gz", gz(&csv));
let db_path = dir.join("out.sqlite3");
let n = stream_pmc_ids_to_sqlite(&gz_path, &db_path, 2, false).unwrap();
assert_eq!(n, 2);
assert_eq!(count_sqlite_works(&db_path).unwrap(), 2);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_stream_existing_record_only_updates_identifiers() {
let dir = std::env::temp_dir().join("commonmeta_pubmed_stream_upsert");
std::fs::create_dir_all(&dir).unwrap();
let db_path = dir.join("out.sqlite3");
let csv1 = format!("{HEADER}J,1111-2222,,2000,,,,10.1186/x,,,, live\n");
let gz1 = write_gz_file(&dir, "pmc1.csv.gz", gz(&csv1));
stream_pmc_ids_to_sqlite(&gz1, &db_path, 0, false).unwrap();
let csv2 = format!("{HEADER}J,1111-2222,,2001,,,,10.1186/x,PMC99,11111,,live\n");
let gz2 = write_gz_file(&dir, "pmc2.csv.gz", gz(&csv2));
stream_pmc_ids_to_sqlite(&gz2, &db_path, 0, false).unwrap();
assert_eq!(count_sqlite_works(&db_path).unwrap(), 1);
let data = read_sqlite_by_id("https://doi.org/10.1186/x", &db_path).unwrap().unwrap();
assert_eq!(data.date_published, "2000", "date_published must not change on update");
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMID" && i.identifier == "11111"),
"PMID must be added when record had no PMID");
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMCID" && i.identifier == "PMC99"),
"PMCID must be added when record had no PMCID");
let csv3 = format!("{HEADER}J,1111-2222,,2002,,,,10.1186/x,PMC999,99999,,live\n");
let gz3 = write_gz_file(&dir, "pmc3.csv.gz", gz(&csv3));
stream_pmc_ids_to_sqlite(&gz3, &db_path, 0, false).unwrap();
let data = read_sqlite_by_id("https://doi.org/10.1186/x", &db_path).unwrap().unwrap();
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMID" && i.identifier == "11111"),
"existing PMID must not be overwritten");
assert!(!data.identifiers.iter().any(|i| i.identifier_type == "PMID" && i.identifier == "99999"),
"new PMID from third import must not replace existing one");
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_resolve_explicit_path_exists() {
let dir = std::env::temp_dir().join("commonmeta_pubmed_resolve_explicit");
std::fs::create_dir_all(&dir).unwrap();
let p = dir.join("pmc.csv.gz");
std::fs::write(&p, gz("header\n")).unwrap();
let result = resolve_pmc_ids_path(Some(p.to_str().unwrap()), true).unwrap();
assert_eq!(result, p);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_resolve_explicit_path_missing_returns_error() {
let result = resolve_pmc_ids_path(Some("/nonexistent/PMC-ids.csv.gz"), true);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found at specified path"));
}
#[test]
fn test_resolve_no_file_no_network_returns_error() {
let orig_home = std::env::var("HOME").unwrap_or_default();
let dir = std::env::temp_dir().join("commonmeta_pubmed_resolve_no_net");
std::fs::create_dir_all(&dir).unwrap();
unsafe { std::env::set_var("HOME", &dir) };
let result = resolve_pmc_ids_path(None, true);
unsafe { std::env::set_var("HOME", &orig_home) };
std::fs::remove_dir_all(&dir).ok();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("--no-network"));
}
#[test]
fn test_read_json_epmc_article_envelope() {
let json = r#"{
"version": "6.8",
"result": {
"pmid": "25368845",
"pmcid": "PMC4202721",
"doi": "10.7554/elife.01561",
"title": "Epidemiology of Burkholderia pseudomallei in Australia",
"authorList": {
"author": [
{
"fullName": "Limmathurotsakul D",
"firstName": "Direk",
"lastName": "Limmathurotsakul",
"affiliation": "University of Oxford"
}
]
},
"journalInfo": {
"issue": "1",
"volume": "3",
"printPublicationDate": "2014-01-01",
"journal": {
"title": "eLife",
"issn": "2050-084X"
}
},
"pubYear": "2014",
"pageInfo": "e01561",
"abstractText": "The bacterium Burkholderia pseudomallei causes melioidosis.",
"language": "eng",
"pubTypeList": { "pubType": ["research-article"] },
"keywordList": { "keyword": ["melioidosis", "Australia"] },
"grantsList": {
"grant": [{ "grantId": "MR/K006924/1", "agency": "Medical Research Council" }]
},
"license": "cc by",
"firstPublicationDate": "2014-01-21",
"dateOfRevision": "2024-09-01"
}
}"#;
let data = read_json(json).unwrap();
assert_eq!(data.id, "https://doi.org/10.7554/elife.01561");
assert_eq!(data.type_, "JournalArticle");
assert_eq!(data.title, "Epidemiology of Burkholderia pseudomallei in Australia");
assert_eq!(data.date_published, "2014-01-21");
assert_eq!(data.date_updated, "2024-09-01");
assert_eq!(data.language, "en");
assert_eq!(data.provider, "PubMed");
assert_eq!(data.container.title, "eLife");
assert_eq!(data.container.identifier, "2050-084X");
assert_eq!(data.container.volume, "3");
assert_eq!(data.container.issue, "1");
assert_eq!(data.container.first_page, "e01561");
assert_eq!(data.contributors.len(), 1);
let c = &data.contributors[0];
assert_eq!(c.given_name(), "Direk");
assert_eq!(c.family_name(), "Limmathurotsakul");
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMID" && i.identifier == "25368845"));
assert!(data.identifiers.iter().any(|i| i.identifier_type == "PMCID" && i.identifier == "PMC4202721"));
assert_eq!(data.subjects.len(), 2);
assert!(data.subjects.iter().any(|s| s.subject == "melioidosis"));
assert_eq!(data.funding_references.len(), 1);
assert_eq!(data.funding_references[0].funder_name, "Medical Research Council");
assert_eq!(data.funding_references[0].award_number, "MR/K006924/1");
assert!(!data.license.url.is_empty());
}
#[test]
fn test_read_json_epmc_pmid_only() {
let json = r#"{
"version": "6.8",
"result": {
"pmid": "12345678",
"title": "A test article",
"pubYear": "2020",
"language": "eng",
"pubTypeList": { "pubType": ["research-article"] }
}
}"#;
let data = read_json(json).unwrap();
assert_eq!(data.id, "https://pubmed.ncbi.nlm.nih.gov/12345678/");
assert_eq!(data.type_, "JournalArticle");
}
#[test]
fn test_resolve_api_url_pmid() {
let url = resolve_api_url("25368845").unwrap();
assert!(url.contains("/article/MED/25368845"));
}
#[test]
fn test_resolve_api_url_pmcid() {
let url = resolve_api_url("PMC4202721").unwrap();
assert!(url.contains("/article/PMC/4202721"));
}
#[test]
fn test_resolve_api_url_europepmc_url() {
let url = resolve_api_url("https://europepmc.org/article/MED/25368845").unwrap();
assert!(url.contains("/article/MED/25368845"));
}
#[test]
fn test_resolve_api_url_doi() {
let url = resolve_api_url("https://doi.org/10.7554/elife.01561").unwrap();
assert!(url.contains("/search") && url.contains("DOI"));
}
#[test]
fn test_resolve_api_url_unknown_returns_error() {
let result = resolve_api_url("not-an-identifier");
assert!(result.is_err());
}
}