use std::path::Path;
use std::time::Instant;
use clap::{Arg, ArgAction, ArgMatches, Command};
use crate::{io_utils};
use crate::cmd::{
resolve_db_path, resolve_cache_db_path,
DATACITE_ANNUAL_HOST,
PIDBOX_CACHE_KEY, PIDBOX_URL, VRAIX_CACHE_TTL,
};
use crate::cmd::convert::{detect_format, ra_to_reader};
use crate::cmd::list::{fetch_list_from_api, fmt_wrote_sqlite};
pub fn command() -> Command {
Command::new("import")
.about("Import scholarly metadata into the local commonmeta database")
.long_about(
"Download and import scholarly metadata into the local commonmeta SQLite \
database (always upserts — existing records are updated, not replaced).\n\n\
The output path defaults to the COMMONMETA_DB environment variable or \
the platform default (~/Library/Application Support/commonmeta/commonmeta.sqlite3 \
on macOS, /var/lib/commonmeta/commonmeta.sqlite3 on Linux).\n\n\
Single record:\n\
commonmeta import 10.7554/elife.01561\n\
commonmeta import https://doi.org/10.7554/elife.01561\n\n\
Annual public data files:\n\
commonmeta import --from crossref --s3 # March 2026 corpus from S3 requester-pays bucket (~$18)\n\
commonmeta import \"https://datafiles.datacite.org/datafiles/public-2025/download?token=<TOKEN>\"\n\
# DataCite 2025 (108 M records, 33 GB); token from\n\
# https://datafiles.datacite.org/datafiles/public-2025\n\
# TAR cached at ~/Library/Caches/commonmeta/datacite/public-2025.tar\n\
commonmeta import --from datacite # re-import from cached TAR (no token needed)\n\
commonmeta import --from datacite --sample # first 1000 records from cache\n\n\
Daily VRAIX dumps:\n\
commonmeta import --from crossref --date 2026-06-15\n\
commonmeta import --from datacite --date 2026-06-15\n\
commonmeta import crossref-2026-06-15.sqlite3\n\n\
Dragoman cache import:\n\
commonmeta import --from cache\n\
commonmeta import --from cache --cache-db /path/to/cache.sqlite3\n\
CACHE_DB=/path/to/cache.sqlite3 commonmeta import --from cache\n\n\
API fetch:\n\
commonmeta import --from crossref --number 100 --member 78\n\
commonmeta import --from datacite --number 100 --client cern.zenodo\n\
commonmeta import --from openalex --number 100 --type journal-article\n\n\
Vocabulary installs:\n\
commonmeta import --from ror\n\
commonmeta import --from pidbox\n\
commonmeta import --from geonames\n\n\
ORCID:\n\
commonmeta import https://orcid.org/0000-0003-1419-2405 # single person\n\
commonmeta import --from orcid # bulk: auto-discover summaries URL\n\
# from figshare and import into people\n\
# table (~46 GB, not the 221 GB bundle)\n\
commonmeta import --from orcid --list-releases # print summaries URL without importing\n\
# (run locally; copy URL to server)\n\
commonmeta import --from orcid \"<SUMMARIES_URL>\" # bulk with direct URL (for servers\n\
# where api.figshare.com is blocked)",
)
.arg(
Arg::new("input")
.help("DOI, VRAIX SQLite path, or DataCite annual download URL (auto-detected)")
.required(false)
.index(1),
)
.arg(
Arg::new("from")
.long("from")
.short('f')
.help("Source format: crossref, datacite, cache, geonames, openalex, pidbox, pubmed, ror")
.default_value("commonmeta"),
)
.arg(
Arg::new("number")
.long("number")
.help("Number of records to fetch via API (file and date inputs always import all)")
.value_parser(clap::value_parser!(usize))
.default_value("0"),
)
.arg(
Arg::new("page")
.long("page")
.help("Page number for API fetches (1-based)")
.value_parser(clap::value_parser!(usize))
.default_value("1"),
)
.arg(Arg::new("member").long("member").help("Crossref member ID"))
.arg(Arg::new("client").long("client").help("DataCite client ID"))
.arg(Arg::new("type").long("type").help("Work type filter"))
.arg(Arg::new("year").long("year").help("Publication year"))
.arg(Arg::new("language").long("language").help("Language filter"))
.arg(Arg::new("orcid").long("orcid").help("Filter by ORCID"))
.arg(Arg::new("ror").long("ror").help("Filter by ROR"))
.arg(Arg::new("affiliation").long("affiliation").help("Affiliation name filter"))
.arg(Arg::new("country").long("country").help("Country code filter"))
.arg(Arg::new("date-updated").long("date-updated").help("Filter by date updated (YYYY-MM-DD)"))
.arg(Arg::new("from-host").long("from-host").help("InvenioRDM source host"))
.arg(Arg::new("from-token").long("from-token").help("InvenioRDM source API token"))
.arg(Arg::new("community").long("community").help("InvenioRDM community slug"))
.arg(Arg::new("subject").long("subject").help("Subject area filter"))
.arg(Arg::new("depositor").long("depositor").help("Crossref depositor name"))
.arg(Arg::new("registrant").long("registrant").help("Crossref registrant name"))
.arg(
Arg::new("email")
.long("email")
.help("Email for OpenAlex mailto parameter"),
)
.arg(
Arg::new("sample")
.long("sample")
.help(
"Return a random sample via API (crossref: max 100, openalex: max 200). \
Crossref --s3: processes first 1000 records from cached TAR (cache must exist). \
DataCite annual URL: streams TAR and stops after 1000 records. \
--from datacite (no URL): uses DataCite API random sample (max 1000) instead of the cached TAR. \
ORCID bulk import: stops after 1000 people; with a URL and no cache, \
streams directly without downloading the full ~46 GB file.",
)
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-orcid")
.long("has-orcid")
.help("Filter for records with ORCID")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-ror-id")
.long("has-ror-id")
.help("Filter for records with ROR")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-references")
.long("has-references")
.help("Filter for records with references")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-relation")
.long("has-relation")
.help("Filter for records with relation")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-abstract")
.long("has-abstract")
.help("Filter for records with abstract")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-award")
.long("has-award")
.help("Filter for records with award")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-license")
.long("has-license")
.help("Filter for records with license")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("has-archive")
.long("has-archive")
.help("Filter for records with archive")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("is-archived")
.long("is-archived")
.help("Filter for archived records")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("vocabulary")
.long("vocabulary")
.help("Output as vocabulary (e.g. InvenioRDM affiliations YAML)")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("match")
.long("match")
.help("Enable ROR affiliation matching when reading crossref and datacite records")
.default_value("true")
.value_parser(clap::value_parser!(bool)),
)
.arg(Arg::new("date").long("date").help(
"Date (YYYY-MM-DD) of a VRAIX daily dump; downloads \
{from}-{date}.sqlite3.zst from metadata.vraix.org when no input \
file path is given",
))
.arg(
Arg::new("s3")
.long("s3")
.help(
"Crossref annual: download from the Crossref S3 bucket \
(s3://api-snapshots-reqpays-crossref). \
Requires the AWS CLI to be installed and configured with credentials; \
the requester pays bandwidth costs (~$18 for the full 2025 file). \
With --sample, processes the first 1000 records from the cached TAR \
(cache must exist; run without --sample first to download).",
)
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("no-network")
.long("no-network")
.help("Disable all outbound network requests; only local .sqlite3 file imports are allowed")
.action(ArgAction::SetTrue),
)
.arg(
Arg::new("cache-db")
.long("cache-db")
.help(
"Path to the cache SQLite database \
(default: $CACHE_DB env var, or /var/lib/commonmeta/cache.sqlite3 on Linux). \
Only used with --from cache.",
),
)
.arg(
Arg::new("pubmed-csv")
.long("pubmed-csv")
.help(
"Path to a local PMC-ids.csv.gz file. \
Defaults to ~/Downloads/PMC-ids.csv.gz, then the platform cache. \
Only used with --from pubmed.",
),
)
.arg(
Arg::new("list-releases")
.long("list-releases")
.help(
"Print the latest ORCID Public Data File release info and exit \
(requires api.figshare.com access). Use this on a local machine \
to obtain the summaries URL for servers where api.figshare.com \
is blocked. Only used with --from orcid.",
)
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("people-db")
.long("people-db")
.help(
"Path to the SQLite database for the 'people' table \
(ORCID records). Defaults to the same path as the main database.",
),
)
.arg(
Arg::new("force")
.long("force")
.action(ArgAction::SetTrue)
.help("Re-import even when the installed version is already up to date (ror, geonames)"),
)
}
pub fn execute(matches: &ArgMatches) -> Result<(), String> {
let input_path = matches.get_one::<String>("input").map(String::as_str);
let date = matches.get_one::<String>("date").map(String::as_str);
let no_network = matches.get_flag("no-network");
let force = matches.get_flag("force");
let is_datacite_annual = input_path
.and_then(|p| url::Url::parse(p).ok())
.and_then(|u| u.host_str().map(|h| h == DATACITE_ANNUAL_HOST))
.unwrap_or(false);
if is_datacite_annual {
let sample = matches.get_flag("sample");
return import_datacite_annual(input_path.unwrap(), sample, no_network);
}
let is_sqlite_input = input_path
.map(|p| io_utils::get_extension(p, ".json").1 == ".sqlite3")
.unwrap_or(false);
let filename_source: Option<&'static str> = if is_sqlite_input {
input_path
.and_then(|p| std::path::Path::new(p).file_stem()?.to_str())
.and_then(|stem| {
if stem.starts_with("crossref-") { Some("crossref") }
else if stem.starts_with("datacite-") { Some("datacite") }
else { None }
})
} else {
None
};
let from_explicit = matches.get_one::<String>("from").map(String::as_str).unwrap_or("commonmeta");
let from_flag: &str = filename_source.unwrap_or(from_explicit);
let from_flag = if from_flag == "commonmeta" && input_path.is_none() {
let has_member = matches.get_one::<String>("member").map(|s| !s.is_empty()).unwrap_or(false);
let has_client = matches.get_one::<String>("client").map(|s| !s.is_empty()).unwrap_or(false);
if has_member { "crossref" }
else if has_client { "datacite" }
else { from_flag }
} else {
from_flag
};
let input_is_orcid = input_path
.map(|p| crate::utils::validate_id(p).1 == "ORCID")
.unwrap_or(false);
if input_is_orcid || from_flag == "orcid" {
let works_db = resolve_db_path(None);
let people_db_arg = matches.get_one::<String>("people-db").map(String::as_str).unwrap_or("");
let people_db = if people_db_arg.is_empty() { works_db.clone() } else { people_db_arg.to_string() };
if matches.get_flag("list-releases") {
let release = crate::fetch_latest_orcid_release().map_err(|e| e.to_string())?;
println!("ORCID Public Data File — latest release");
println!(" Year/batch : {}_{}", release.year, release.batch);
println!(" Filename : {}", release.filename);
println!(" Size : {:.1} GB compressed", release.size_bytes as f64 / 1_073_741_824.0);
println!(" SUMMARIES : {}", release.download_url);
println!();
println!("To import on a server where api.figshare.com is blocked:");
println!(" commonmeta import --from orcid \"{}\"", release.download_url);
return Ok(());
}
match input_path {
Some(raw) if input_is_orcid => {
if no_network {
return Err("--no-network: ORCID API fetch requires network access; remove --no-network".to_string());
}
return import_orcid_person(raw, &people_db, &works_db);
}
url_arg => {
let url_override = url_arg.filter(|u| u.starts_with("http"));
let sample = matches.get_flag("sample");
let count = crate::import_orcid_public_data(
std::path::Path::new(&people_db),
url_override,
no_network,
sample,
)
.map_err(|e| e.to_string())?;
if count > 0 {
println!("imported {} people from ORCID Public Data File into {}", count, people_db);
}
return Ok(());
}
}
}
let input_is_ror = input_path
.map(|p| crate::utils::validate_id(p).1 == "ROR")
.unwrap_or(false);
if input_is_ror {
if no_network {
return Err("--no-network: ROR works fetch requires network access; remove --no-network".to_string());
}
let raw = input_path.unwrap();
let out_path = resolve_db_path(None);
let number = *matches.get_one::<usize>("number").unwrap_or(&0);
let page = *matches.get_one::<usize>("page").unwrap_or(&1);
return import_ror_works(raw, &out_path, number, page);
}
let (from, input_path) = match input_path {
Some(s)
if from_flag == "commonmeta"
&& !is_sqlite_input
&& matches!(s, "cache" | "crossref" | "geonames" | "openalex" | "pidbox" | "prefixes" | "pubmed" | "ror") =>
{
(s, None)
}
_ => (from_flag, input_path),
};
if !matches!(from, "cache" | "crossref" | "datacite" | "geonames" | "openalex" | "pidbox" | "prefixes" | "pubmed" | "ror" | "commonmeta") {
return Err(format!(
"import: unsupported --from value '{}' (supported: cache, crossref, datacite, geonames, openalex, pidbox, prefixes, pubmed, ror)",
from
));
}
if no_network && !(is_sqlite_input && input_path.is_some()) && from != "pubmed" {
return Err(
"--no-network requires a local .sqlite3 input file; \
provide a VRAIX dump path or remove --no-network"
.to_string(),
);
}
if from == "ror" {
let out_path = resolve_db_path(None);
return install_ror(&out_path, force);
}
if from == "prefixes" {
let db_path = resolve_db_path(matches.get_one::<String>("file"));
if !std::path::Path::new(&db_path).exists() {
return Err(format!("database not found: {}", db_path));
}
eprintln!("import: resolving DOI prefixes from {}", db_path);
let start = std::time::Instant::now();
let n = crate::import_prefixes(std::path::Path::new(&db_path))
.map_err(|e| e.to_string())?;
eprintln!("import: resolved {} prefix(es) in {:.2?}", n, start.elapsed());
return Ok(());
}
if from == "pidbox" {
let out_path = resolve_db_path(None);
return install_pidbox(&out_path);
}
if from == "geonames" {
let out_path = resolve_db_path(None);
return install_geonames(&out_path, force);
}
if from == "cache" {
let out_path = resolve_db_path(None);
let cache_path = resolve_cache_db_path(matches.get_one::<String>("cache-db"));
let people_db_arg = matches.get_one::<String>("people-db").map(String::as_str).unwrap_or("");
let people_path = if people_db_arg.is_empty() { out_path.clone() } else { people_db_arg.to_string() };
return import_cache(&cache_path, &out_path, &people_path);
}
if from == "pubmed" {
let out_path = resolve_db_path(None);
let number = *matches.get_one::<usize>("number").unwrap_or(&0);
if let Some(identifier) = input_path {
let looks_like_csv = identifier.ends_with(".csv.gz") || identifier.ends_with(".csv");
if !looks_like_csv {
if no_network {
return Err("--no-network: Europe PMC API fetch requires network access; remove --no-network".to_string());
}
return import_single(identifier, "pubmed", &out_path);
}
}
if number > 0 {
if no_network {
return Err("--no-network: Europe PMC API fetch requires network access; remove --no-network".to_string());
}
} else {
let explicit_csv = matches.get_one::<String>("pubmed-csv").map(String::as_str);
return import_pubmed(explicit_csv, &out_path, no_network);
}
}
let out_path = resolve_db_path(None);
let is_vraix_sqlite = is_sqlite_input && matches!(from, "crossref" | "datacite");
let is_date_download = date.is_some() && input_path.is_none() && matches!(from, "crossref" | "datacite");
if is_vraix_sqlite || is_date_download {
return import_vraix_fast(from, input_path, date, &out_path);
}
let has_api_filters = matches.get_one::<String>("member").map(|s| !s.is_empty()).unwrap_or(false)
|| matches.get_one::<String>("client").map(|s| !s.is_empty()).unwrap_or(false)
|| matches.get_one::<String>("orcid").map(|s| !s.is_empty()).unwrap_or(false)
|| matches.get_one::<String>("ror").map(|s| !s.is_empty()).unwrap_or(false)
|| *matches.get_one::<usize>("number").unwrap_or(&0) > 0
|| matches.get_flag("sample");
let s3 = matches.get_flag("s3");
let is_annual_crossref = matches!(from, "crossref")
&& input_path.is_none()
&& date.is_none()
&& !has_api_filters;
if is_annual_crossref {
if s3 {
return import_crossref_s3(false, no_network);
}
return Err(
"import: 'commonmeta import --from crossref' requires a filter or mode:\n \
--sample fetch up to 100 random records via the Crossref API\n \
--number N fetch N records via the Crossref API\n \
--member ID filter by member (implies API fetch)\n \
--s3 download the full annual corpus from S3 (~$18)".to_string()
);
}
let cache_path = io_utils::cache_dir("datacite").join("public-2025.tar");
let is_datacite_annual_cmd = from == "datacite"
&& input_path.is_none()
&& date.is_none()
&& !has_api_filters
&& !matches.get_flag("sample");
if is_datacite_annual_cmd {
if !cache_path.exists() || cache_path.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
return Err(format!(
"import: no cached DataCite 2025 TAR found at {}.\n\
Obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025\n\
and run: commonmeta import \"<URL>\"",
cache_path.display()
));
}
let sample = matches.get_flag("sample");
return import_datacite_annual("", sample, no_network);
}
if is_sqlite_input && from == "commonmeta" {
if let Some(src) = input_path {
return import_commonmeta_sqlite(src, &out_path);
}
}
if let Some(identifier) = input_path {
if !is_sqlite_input {
let effective_from = if from_explicit == "commonmeta" {
let ra = detect_format(identifier, no_network);
match ra_to_reader(&ra) {
Some(reader) => reader.to_string(),
None => return Err(format!(
"import: no reader available for DOI registration agency '{}' \
(use --from to specify a supported format)",
ra
)),
}
} else {
from_explicit.to_string()
};
return import_single(identifier, &effective_from, &out_path);
}
}
if from == "commonmeta" {
return Err(
"import: --from commonmeta requires an input .sqlite3 file path".to_string()
);
}
let fetch_start = Instant::now();
let data = fetch_list_from_api(matches, from)?;
eprintln!(
"import: fetch took {:.2?} ({} records)",
fetch_start.elapsed(),
data.len()
);
let out_sqlite = Path::new(&out_path);
let write_start = Instant::now();
crate::upsert_sqlite(&data, out_sqlite).map_err(|e| e.to_string())?;
let total = crate::count_sqlite_works(out_sqlite).ok();
eprintln!(
"import: upsert took {:.2?} ({} records)",
write_start.elapsed(),
data.len()
);
println!("{}", fmt_wrote_sqlite(&out_path, data.len(), total));
Ok(())
}
fn import_datacite_annual(url: &str, sample: bool, no_network: bool) -> Result<(), String> {
use std::io::BufReader;
use tar::Archive;
const DATACITE_SAMPLE_LINES: usize = 1_000;
let limit = if sample { DATACITE_SAMPLE_LINES } else { usize::MAX };
let cache_path = io_utils::cache_dir("datacite").join("public-2025.tar");
let cached = cache_path.exists() && cache_path.metadata().map(|m| m.len()).unwrap_or(0) > 0;
if sample && !cached {
if url.is_empty() {
return Err(format!(
"import: no cached DataCite 2025 TAR at {}; \
obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025",
cache_path.display()
));
}
if no_network {
return Err("--no-network: cannot stream DataCite annual data file".to_string());
}
eprintln!("import: streaming DataCite 2025 (first {} records, no cache)", limit);
let client = reqwest::blocking::Client::builder()
.user_agent(io_utils::commonmeta_user_agent())
.timeout(std::time::Duration::from_secs(24 * 60 * 60))
.build()
.map_err(|e| format!("HTTP client error: {}", e))?;
let resp = client.get(url).send()
.and_then(|r| r.error_for_status())
.map_err(|e| format!("download failed: {}", e))?;
return process_datacite_archive(Archive::new(BufReader::new(resp)), limit, true);
}
if !cached {
if url.is_empty() {
return Err(format!(
"import: no cached DataCite 2025 TAR at {}; \
obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025",
cache_path.display()
));
}
if no_network {
return Err(format!(
"--no-network: cached TAR not found at {}; provide a download URL to cache it first",
cache_path.display()
));
}
eprintln!("import: downloading DataCite 2025 annual data file to {}", cache_path.display());
let client = reqwest::blocking::Client::builder()
.user_agent(io_utils::commonmeta_user_agent())
.timeout(std::time::Duration::from_secs(24 * 60 * 60))
.build()
.map_err(|e| format!("HTTP client error: {}", e))?;
let mut resp = client.get(url).send()
.and_then(|r| r.error_for_status())
.map_err(|e| format!("download failed: {}", e))?;
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| format!("mkdir: {}", e))?;
}
let mut file = std::fs::File::create(&cache_path)
.map_err(|e| format!("create cache file: {}", e))?;
let bytes = std::io::copy(&mut resp, &mut file)
.map_err(|e| format!("download write: {}", e))?;
eprintln!("import: cached {} GB at {}", bytes / 1_073_741_824, cache_path.display());
} else {
eprintln!("import: using cached DataCite 2025 TAR at {}", cache_path.display());
}
eprintln!(
"import: processing DataCite 2025 annual data file{}",
if sample { format!(" (first {} records)", limit) } else { String::new() },
);
let file = std::fs::File::open(&cache_path)
.map_err(|e| format!("open cache: {}", e))?;
process_datacite_archive(Archive::new(file), limit, sample)
}
fn process_datacite_archive<R: std::io::Read>(mut archive: tar::Archive<R>, limit: usize, is_sample: bool) -> Result<(), String> {
use flate2::read::GzDecoder;
use std::io::{BufRead, BufReader, Read as _, Cursor};
use rayon::prelude::*;
let out_path = resolve_db_path(None);
let out_sqlite = Path::new(&out_path);
let mut total_records = 0usize;
let start = Instant::now();
let mut file_count = 0usize;
'entries: for entry_result in archive.entries().map_err(|e| format!("TAR read: {}", e))? {
let mut entry = match entry_result {
Ok(e) => e,
Err(e) => {
eprintln!("import: TAR truncated after {} files ({} records) — {}", file_count, total_records, e);
break 'entries;
}
};
let name = entry.path()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default();
if !name.ends_with(".jsonl.gz") {
continue;
}
let mut compressed: Vec<u8> = Vec::new();
if let Err(e) = entry.read_to_end(&mut compressed) {
eprintln!("import: read error in {}: {}", name, e);
break 'entries;
}
let mut decompressed: Vec<u8> = Vec::new();
if let Err(e) = GzDecoder::new(Cursor::new(&compressed)).read_to_end(&mut decompressed) {
eprintln!("import: decompress error in {}: {}", name, e);
continue;
}
if decompressed.is_empty() {
continue;
}
file_count += 1;
let take = limit.saturating_sub(total_records);
let lines: Vec<String> = BufReader::new(Cursor::new(decompressed))
.lines()
.filter_map(|l| l.ok())
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty())
.take(take)
.collect();
if lines.is_empty() {
continue;
}
let batch: Vec<crate::Data> = lines
.par_iter()
.filter_map(|trimmed| {
let input = format!("{{\"data\":{}}}", trimmed);
crate::read("datacite", &input).ok()
})
.collect();
let n = batch.len();
if n > 0 {
crate::upsert_sqlite(&batch, out_sqlite)
.map_err(|e| format!("upsert failed after {}: {}", name, e))?;
total_records += n;
}
eprintln!("import: {} — {} records ({} total in {:.0?})", name, n, total_records, start.elapsed());
if total_records >= limit {
break 'entries;
}
}
let db_total = crate::count_sqlite_works(out_sqlite).ok();
eprintln!("import: {} files, {} records in {:.2?}", file_count, total_records, start.elapsed());
if !is_sample {
let _ = crate::set_sqlite_setting(out_sqlite, "datacite_annual_date", "2025");
}
eprintln!("import: rebuilding FTS index…");
crate::rebuild_works_fts(out_sqlite).map_err(|e| format!("FTS rebuild: {e}"))?;
println!("{}", fmt_wrote_sqlite(&out_path, total_records, db_total));
Ok(())
}
const CROSSREF_S3_BUCKET: &str = "api-snapshots-reqpays-crossref";
const CROSSREF_S3_KEY: &str = "March_2026_Public_Data_File_from_Crossref.tar";
fn import_crossref_s3(sample: bool, no_network: bool) -> Result<(), String> {
const CROSSREF_S3_SAMPLE_LINES: usize = 1_000;
let limit = if sample { CROSSREF_S3_SAMPLE_LINES } else { usize::MAX };
let cache_path = io_utils::cache_dir("crossref").join("crossref-annual-s3.tar");
let cached = cache_path.exists() && cache_path.metadata().map(|m| m.len()).unwrap_or(0) > 0;
if sample && !cached {
return Err(format!(
"import: --s3 --sample requires the cached TAR at {}.\n\
Run without --sample first to download the full archive, then use --sample \
to test against the cached data.\n\
For a quick smoke-test without downloading, use:\n\
commonmeta import --from crossref --sample (torrent, first 5 files)",
cache_path.display()
));
}
if !cached {
if no_network {
return Err(format!(
"--no-network: no cached Crossref S3 TAR at {}",
cache_path.display()
));
}
if std::process::Command::new("aws").arg("--version").output().is_err() {
return Err(
"import: aws CLI not found — install it from https://aws.amazon.com/cli/ \
and configure credentials before using --s3"
.to_string(),
);
}
let key = std::env::var("CROSSREF_S3_KEY")
.unwrap_or_else(|_| CROSSREF_S3_KEY.to_string());
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| format!("mkdir: {}", e))?;
}
eprintln!(
"import: downloading s3://{}/{} (~208 GB) to {}\n\
Note: this is a requester-pays bucket; bandwidth costs (~$18) are charged to your AWS account.",
CROSSREF_S3_BUCKET, key, cache_path.display()
);
let status = std::process::Command::new("aws")
.args([
"s3", "cp",
"--request-payer", "requester",
&format!("s3://{}/{}", CROSSREF_S3_BUCKET, key),
&cache_path.to_string_lossy(),
])
.status()
.map_err(|e| format!("aws s3 cp failed: {}", e))?;
if !status.success() {
return Err(
"aws s3 cp failed — check AWS credentials, IAM permissions, \
and that you accept requester-pays charges for this bucket"
.to_string(),
);
}
eprintln!("import: cached Crossref annual TAR at {}", cache_path.display());
} else {
eprintln!("import: using cached Crossref annual TAR at {}", cache_path.display());
}
eprintln!(
"import: processing Crossref annual data file{}",
if sample { format!(" (first {} records)", limit) } else { String::new() },
);
let file = std::fs::File::open(&cache_path)
.map_err(|e| format!("open cache: {}", e))?;
process_crossref_s3_archive(tar::Archive::new(file), limit, sample)
}
fn process_crossref_s3_archive<R: std::io::Read>(mut archive: tar::Archive<R>, limit: usize, is_sample: bool) -> Result<(), String> {
use flate2::read::GzDecoder;
use std::io::{BufRead, BufReader, Read as _, Cursor};
use rayon::prelude::*;
let out_path = resolve_db_path(None);
let out_sqlite = Path::new(&out_path);
let mut total_records = 0usize;
let start = Instant::now();
let mut file_count = 0usize;
'entries: for entry_result in archive.entries().map_err(|e| format!("TAR read: {}", e))? {
let mut entry = match entry_result {
Ok(e) => e,
Err(e) => {
eprintln!("import: TAR truncated after {} files ({} records) — {}", file_count, total_records, e);
break 'entries;
}
};
let name = entry.path()
.map(|p| p.to_string_lossy().into_owned())
.unwrap_or_default();
let is_gz = name.ends_with(".jsonl.gz") || name.ends_with(".json.gz");
let is_jsonl = name.ends_with(".jsonl") || name.ends_with(".json");
if !is_gz && !is_jsonl {
continue;
}
let mut raw: Vec<u8> = Vec::new();
if let Err(e) = entry.read_to_end(&mut raw) {
eprintln!("import: read error in {}: {}", name, e);
break 'entries;
}
if raw.is_empty() {
continue;
}
let decompressed: Vec<u8> = if is_gz {
let mut buf = Vec::new();
if let Err(e) = GzDecoder::new(Cursor::new(&raw)).read_to_end(&mut buf) {
eprintln!("import: decompress error in {}: {}", name, e);
continue;
}
buf
} else {
raw
};
if decompressed.is_empty() {
continue;
}
file_count += 1;
let take = limit.saturating_sub(total_records);
let lines: Vec<String> = BufReader::new(Cursor::new(decompressed))
.lines()
.filter_map(|l| l.ok())
.map(|l| l.trim().to_string())
.filter(|l| !l.is_empty())
.take(take)
.collect();
if lines.is_empty() {
continue;
}
let batch: Vec<crate::Data> = lines
.par_iter()
.filter_map(|trimmed| {
let input = format!(r#"{{"message":{}}}"#, trimmed);
crate::read("crossref", &input).ok()
})
.collect();
let n = batch.len();
if n > 0 {
crate::upsert_sqlite(&batch, out_sqlite)
.map_err(|e| format!("upsert failed after {}: {}", name, e))?;
total_records += n;
}
eprintln!("import: {} — {} records ({} total in {:.0?})", name, n, total_records, start.elapsed());
if total_records >= limit {
break 'entries;
}
}
let db_total = crate::count_sqlite_works(out_sqlite).ok();
eprintln!("import: {} files, {} records in {:.2?}", file_count, total_records, start.elapsed());
if !is_sample {
let _ = crate::set_sqlite_setting(out_sqlite, "crossref_annual_date", "2026-03");
}
eprintln!("import: rebuilding FTS index…");
crate::rebuild_works_fts(out_sqlite).map_err(|e| format!("FTS rebuild: {e}"))?;
println!("{}", fmt_wrote_sqlite(&out_path, total_records, db_total));
Ok(())
}
fn import_commonmeta_sqlite(src_path: &str, out_path: &str) -> Result<(), String> {
let total_start = Instant::now();
let src = Path::new(src_path);
let out = Path::new(out_path);
let read_start = Instant::now();
let data = crate::read_sqlite_commonmeta(src, None, 0)
.map_err(|e| format!("failed to read '{}': {}", src_path, e))?;
eprintln!("import: read {} records in {:.2?}", data.len(), read_start.elapsed());
let write_start = Instant::now();
crate::upsert_sqlite(&data, out).map_err(|e| e.to_string())?;
let total = crate::count_sqlite_works(out).ok();
eprintln!("import: upsert took {:.2?}", write_start.elapsed());
eprintln!("import: total {:.2?}", total_start.elapsed());
println!("{}", fmt_wrote_sqlite(out_path, data.len(), total));
Ok(())
}
fn import_single(identifier: &str, from: &str, out_path: &str) -> Result<(), String> {
let fetch_start = Instant::now();
let mut data = crate::read(from, identifier).map_err(|e| e.to_string())?;
eprintln!("import: fetch took {:.2?}", fetch_start.elapsed());
let out_sqlite = Path::new(out_path);
let ref_works = crate::fetch_reference_works(&data, Some(out_sqlite), false);
let write_start = Instant::now();
let all: Vec<crate::Data> = std::iter::once(data.clone())
.chain(ref_works.clone())
.collect();
crate::upsert_sqlite(&all, out_sqlite).map_err(|e| e.to_string())?;
eprintln!("import: upsert took {:.2?} ({} records)", write_start.elapsed(), all.len());
crate::enrich_citations(&mut data, out_sqlite);
let mut main_prepared = crate::prepare_commonmeta(&data);
main_prepared.references.clear();
let mut items = vec![serde_json::to_value(main_prepared).map_err(|e| e.to_string())?];
for work in &ref_works {
let mut prepared = crate::prepare_commonmeta(work);
prepared.references.clear();
items.push(serde_json::to_value(prepared).map_err(|e| e.to_string())?);
}
let output = serde_json::to_vec_pretty(&items).map_err(|e| e.to_string())?;
println!("{}", String::from_utf8_lossy(&output));
Ok(())
}
fn import_cache(cache_path: &str, out_path: &str, people_path: &str) -> Result<(), String> {
let cache_sqlite = std::path::Path::new(cache_path);
let out_sqlite = std::path::Path::new(out_path);
let people_sqlite = std::path::Path::new(people_path);
if !cache_sqlite.exists() {
return Err(format!(
"import: cache not found at {}\n\
Set CACHE_DB or use --cache-db to specify the path",
cache_path
));
}
eprintln!("import: reading from cache at {}", cache_path);
let start = Instant::now();
let n = crate::stream_pidbox_to_sqlite(cache_sqlite, out_sqlite, 0, true)
.map_err(|e| format!("import: cache stream failed: {}", e))?;
let total = crate::count_sqlite_works(out_sqlite).ok();
eprintln!("import: imported {} work records in {:.2?}", n, start.elapsed());
let n_people = crate::stream_cache_orcid_to_people_sqlite(cache_sqlite, people_sqlite)
.map_err(|e| format!("import: cache orcid stream failed: {}", e))?;
if n_people > 0 {
eprintln!("import: imported {} person records into {}", n_people, people_path);
}
let deleted = crate::flush_dragoman_cache(cache_sqlite)
.map_err(|e| format!("import: flush cache failed: {}", e))?;
eprintln!("import: flushed {} rows from cache", deleted);
println!("{}", fmt_wrote_sqlite(out_path, n, total));
Ok(())
}
fn import_pubmed(
explicit_csv: Option<&str>,
out_path: &str,
no_network: bool,
) -> Result<(), String> {
let gz_path = crate::pubmed::resolve_pmc_ids_path(explicit_csv, no_network)
.map_err(|e| format!("import: {e}"))?;
let out_sqlite = std::path::Path::new(out_path);
eprintln!("import: reading PMC-ids from {}", gz_path.display());
let start = Instant::now();
let n = crate::stream_pmc_ids_to_sqlite(&gz_path, out_sqlite, 0, no_network)
.map_err(|e| format!("import: PMC-ids stream failed: {e}"))?;
let total = crate::count_sqlite_works(out_sqlite).ok();
eprintln!("import: imported {} records in {:.2?}", n, start.elapsed());
println!("{}", fmt_wrote_sqlite(out_path, n, total));
Ok(())
}
fn import_vraix_fast(
from: &str,
input_path: Option<&str>,
date: Option<&str>,
out_path: &str,
) -> Result<(), String> {
let total_start = Instant::now();
let out_sqlite = std::path::PathBuf::from(out_path);
let (in_sqlite, tmp_to_clean) = if date.is_some() && input_path.is_none() {
let date = date.unwrap();
let url = format!("https://metadata.vraix.org/{}-{}.sqlite3.zst", from, date);
let cache_key = format!("{}-{}.sqlite3.zst", from, date);
let dl_start = Instant::now();
let (cache_path, from_cache) =
io_utils::ensure_cached_path(&url, "vraix", &cache_key, VRAIX_CACHE_TTL)
.map_err(|e| format!("failed to download '{}': {}", url, e))?;
let size = cache_path.metadata().map(|m| m.len()).unwrap_or(0);
eprintln!(
"import: download took {:.2?} ({} bytes{})",
dl_start.elapsed(),
size,
if from_cache { ", from cache" } else { "" }
);
let dc_start = Instant::now();
let tmp = out_sqlite.with_extension(format!("sqlite3.vraix-{}.tmp", std::process::id()));
let dc_bytes = io_utils::decompress_zst_file(&cache_path, &tmp)
.map_err(|e| format!("failed to decompress '{}': {}", url, e))?;
eprintln!(
"import: decompress took {:.2?} ({} bytes)",
dc_start.elapsed(),
dc_bytes
);
(tmp.clone(), Some(tmp))
} else {
(std::path::PathBuf::from(input_path.unwrap()), None)
};
let convert_start = Instant::now();
let result = crate::stream_vraix_to_sqlite(&in_sqlite, from, &out_sqlite, 0, true)
.map_err(|e| e.to_string());
if let Some(tmp) = tmp_to_clean {
std::fs::remove_file(&tmp).ok();
}
let n = result?;
let total = crate::count_sqlite_works(&out_sqlite).ok();
eprintln!(
"import: convert+write took {:.2?} ({} records)",
convert_start.elapsed(),
n
);
eprintln!("import: total took {:.2?}", total_start.elapsed());
println!("{}", fmt_wrote_sqlite(out_path, n, total));
Ok(())
}
pub(crate) fn install_ror(out_path: &str, force: bool) -> Result<(), String> {
let total = Instant::now();
eprintln!("Fetching latest ROR release metadata from Zenodo...");
let t = Instant::now();
let release = crate::fetch_latest_ror_release().map_err(|e| e.to_string())?;
eprintln!(" metadata fetched in {:.2}s", t.elapsed().as_secs_f64());
let db_path = Path::new(out_path);
match crate::fetch_installed_ror_version(db_path).map_err(|e| e.to_string())? {
Some(ref installed) if installed == &release.version && !force => {
println!(
"ROR {} ({}) is already installed at {}",
release.version, release.date, out_path
);
return Ok(());
}
Some(ref installed) if installed == &release.version => {
eprintln!("Re-importing ROR {} ({}) (--force)...", installed, release.version);
}
Some(ref installed) => {
eprintln!("Upgrading ROR {} → {}...", installed, release.version);
}
None => {}
}
let t = Instant::now();
let (list, from_cache) =
crate::download_ror_release(&release).map_err(|e| e.to_string())?;
eprintln!(
" {} and parsed {} organizations in {:.2}s",
if from_cache { "loaded" } else { "downloaded" },
list.len(),
t.elapsed().as_secs_f64()
);
eprintln!("Writing to {}...", out_path);
let t = Instant::now();
crate::write_ror_sqlite(&list, db_path, Some(&release.version), Some(&release.date))
.map_err(|e| e.to_string())?;
eprintln!(" SQLite written in {:.2}s", t.elapsed().as_secs_f64());
eprintln!(" total: {:.2}s", total.elapsed().as_secs_f64());
println!(
"Installed ROR {} ({}) → {} ({} organizations)",
release.version,
release.date,
out_path,
list.len(),
);
Ok(())
}
pub(crate) fn install_geonames(out_path: &str, force: bool) -> Result<(), String> {
let total = Instant::now();
let db_path = Path::new(out_path);
let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
if !force {
if let Ok(Some(ref installed)) = crate::fetch_installed_geonames_date(db_path) {
if installed == &today {
println!("GeoNames ({}) is already installed at {}", installed, out_path);
return Ok(());
}
}
} else if let Ok(Some(ref installed)) = crate::fetch_installed_geonames_date(db_path) {
eprintln!("Re-importing GeoNames ({}) (--force)...", installed);
}
eprintln!("Downloading GeoNames cities500 dump...");
let t = Instant::now();
let (count, from_cache) =
crate::install_geonames_sqlite(db_path, Some(&today)).map_err(|e| e.to_string())?;
eprintln!(
" {} and parsed {} places in {:.2}s",
if from_cache { "loaded" } else { "downloaded" },
count,
t.elapsed().as_secs_f64()
);
eprintln!(" total: {:.2}s", total.elapsed().as_secs_f64());
println!(
"Installed GeoNames ({}) → {} ({} places)",
today, out_path, count,
);
Ok(())
}
pub(crate) fn install_pidbox(out_path: &str) -> Result<(), String> {
let total = Instant::now();
eprintln!("Downloading pidbox from {}...", PIDBOX_URL);
let t = Instant::now();
let (cache_path, from_cache) =
io_utils::ensure_cached_path(PIDBOX_URL, "vraix", PIDBOX_CACHE_KEY, VRAIX_CACHE_TTL)
.map_err(|e| format!("failed to download pidbox: {}", e))?;
if from_cache {
eprintln!(" pidbox download skipped (cached at {})", cache_path.display());
} else {
eprintln!(" downloaded in {:.2}s", t.elapsed().as_secs_f64());
}
let out = Path::new(out_path);
eprintln!("Converting (streaming decompress + convert) → {}…", out_path);
let t = Instant::now();
let n = crate::stream_zst_pidbox_to_sqlite(&cache_path, out, 0)
.map_err(|e| format!("failed to convert pidbox: {}", e))?;
eprintln!(" converted and wrote {} records in {:.0}s", n, t.elapsed().as_secs_f64());
eprintln!(" total: {:.0}s", total.elapsed().as_secs_f64());
let date = crate::fetch_installed_vraix_date(out)
.ok()
.flatten()
.map(|d| format!(", vraix_date: {d}"))
.unwrap_or_default();
println!("Installed pidbox → {} ({} records{})", out_path, n, date);
Ok(())
}
fn import_ror_works(ror_url: &str, works_db: &str, number: usize, page: usize) -> Result<(), String> {
let ror = crate::utils::normalize_ror(ror_url);
if ror.is_empty() {
return Err(format!("import: '{}' is not a valid ROR identifier", ror_url));
}
let limit = if number == 0 { 10 } else { number };
let t = Instant::now();
let org = crate::fetch_ror(&ror).map_err(|e| e.to_string())?;
let mut works: Vec<crate::Data> = Vec::new();
let mut cr = crate::fetch_crossref_by_ror(&ror, limit, page).unwrap_or_default();
let mut dc = crate::fetch_datacite_by_ror(&ror, limit, page).unwrap_or_default();
works.append(&mut cr);
works.append(&mut dc);
if !works.is_empty() {
let mut seen = std::collections::HashSet::new();
works.retain(|d| seen.insert(d.id.clone()));
works.sort_by(|a, b| b.date_published.cmp(&a.date_published));
works.truncate(limit);
crate::upsert_sqlite(&works, Path::new(works_db)).map_err(|e| e.to_string())?;
}
eprintln!("ror: imported {} works for {} in {:.2?}", works.len(), ror, t.elapsed());
let org_val = serde_json::to_value(&org).map_err(|e| e.to_string())?;
let mut items = vec![org_val];
for work in &works {
let prepared = crate::prepare_commonmeta(work);
items.push(serde_json::to_value(&prepared).map_err(|e| e.to_string())?);
}
let output = serde_json::to_vec_pretty(&items).map_err(|e| e.to_string())?;
println!("{}", String::from_utf8_lossy(&output));
Ok(())
}
fn import_orcid_person(orcid_url: &str, people_db: &str, works_db: &str) -> Result<(), String> {
let t = Instant::now();
let n_works = crate::import_orcid_person(
orcid_url,
Path::new(people_db),
Path::new(works_db),
)
.map_err(|e| e.to_string())?;
eprintln!("orcid: imported {} in {:.2?}", orcid_url, t.elapsed());
println!(
"orcid: {} → {} ({} works → {})",
orcid_url, people_db, n_works, works_db
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn parse_args(args: &[&str]) -> clap::ArgMatches {
command().try_get_matches_from(args).expect("arg parse failed")
}
#[test]
fn test_no_network_with_doi_errors() {
let m = parse_args(&["import", "--no-network", "10.7554/elife.01567"]);
let err = execute(&m).unwrap_err();
assert!(
err.contains("--no-network"),
"expected --no-network in error, got: {err}"
);
}
#[test]
fn test_no_network_with_api_fetch_errors() {
let m = parse_args(&["import", "--no-network", "--from", "crossref", "--ror", "00pd74e08"]);
let err = execute(&m).unwrap_err();
assert!(
err.contains("--no-network"),
"expected --no-network in error, got: {err}"
);
}
#[test]
fn test_no_network_with_local_sqlite_passes_guard() {
let m = parse_args(&["import", "--no-network", "local.sqlite3"]);
let err = execute(&m).unwrap_err();
assert!(
!err.contains("--no-network"),
"should not fail at network guard for local sqlite, got: {err}"
);
}
}