Skip to main content

commonmeta/cmd/
import.rs

1/*
2 * Copyright © 2026 Front Matter <info@front-matter.de>
3 */
4
5use std::path::Path;
6use std::time::Instant;
7
8use clap::{Arg, ArgAction, ArgMatches, Command};
9
10use crate::{io_utils};
11
12use crate::cmd::{
13    resolve_db_path, resolve_cache_db_path,
14    DATACITE_ANNUAL_HOST,
15    PIDBOX_CACHE_KEY, PIDBOX_URL, VRAIX_CACHE_TTL,
16};
17use crate::cmd::convert::{detect_format, ra_to_reader};
18use crate::cmd::list::{fetch_list_from_api, fmt_wrote_sqlite};
19
20pub fn command() -> Command {
21    Command::new("import")
22        .about("Import scholarly metadata into the local commonmeta database")
23        .long_about(
24            "Download and import scholarly metadata into the local commonmeta SQLite \
25            database (always upserts — existing records are updated, not replaced).\n\n\
26            The output path defaults to the COMMONMETA_DB environment variable or \
27            the platform default (~/Library/Application Support/commonmeta/commonmeta.sqlite3 \
28            on macOS, /var/lib/commonmeta/commonmeta.sqlite3 on Linux).\n\n\
29            Single record:\n\
30            commonmeta import 10.7554/elife.01561\n\
31            commonmeta import https://doi.org/10.7554/elife.01561\n\n\
32            Annual public data files:\n\
33            commonmeta import --from crossref --s3      # March 2026 corpus from S3 requester-pays bucket (~$18)\n\
34            commonmeta import \"https://datafiles.datacite.org/datafiles/public-2025/download?token=<TOKEN>\"\n\
35                                                        # DataCite 2025 (108 M records, 33 GB); token from\n\
36                                                        # https://datafiles.datacite.org/datafiles/public-2025\n\
37                                                        # TAR cached at ~/Library/Caches/commonmeta/datacite/public-2025.tar\n\
38            commonmeta import --from datacite           # re-import from cached TAR (no token needed)\n\
39            commonmeta import --from datacite --sample  # first 1000 records from cache\n\n\
40            Daily VRAIX dumps:\n\
41            commonmeta import --from crossref --date 2026-06-15\n\
42            commonmeta import --from datacite --date 2026-06-15\n\
43            commonmeta import crossref-2026-06-15.sqlite3\n\n\
44            Dragoman cache import:\n\
45            commonmeta import --from cache\n\
46            commonmeta import --from cache --cache-db /path/to/cache.sqlite3\n\
47            CACHE_DB=/path/to/cache.sqlite3 commonmeta import --from cache\n\n\
48            API fetch:\n\
49            commonmeta import --from crossref --number 100 --member 78\n\
50            commonmeta import --from datacite --number 100 --client cern.zenodo\n\
51            commonmeta import --from openalex --number 100 --type journal-article\n\n\
52            Vocabulary installs:\n\
53            commonmeta import --from ror\n\
54            commonmeta import --from pidbox\n\
55            commonmeta import --from geonames\n\n\
56            ORCID:\n\
57            commonmeta import https://orcid.org/0000-0003-1419-2405  # single person\n\
58            commonmeta import --from orcid                            # bulk: auto-discover summaries URL\n\
59                                                                      # from figshare and import into people\n\
60                                                                      # table (~46 GB, not the 221 GB bundle)\n\
61            commonmeta import --from orcid --list-releases            # print summaries URL without importing\n\
62                                                                      # (run locally; copy URL to server)\n\
63            commonmeta import --from orcid \"<SUMMARIES_URL>\"          # bulk with direct URL (for servers\n\
64                                                                      # where api.figshare.com is blocked)",
65        )
66        .arg(
67            Arg::new("input")
68                .help("DOI, VRAIX SQLite path, or DataCite annual download URL (auto-detected)")
69                .required(false)
70                .index(1),
71        )
72        .arg(
73            Arg::new("from")
74                .long("from")
75                .short('f')
76                .help("Source format: crossref, datacite, cache, geonames, openalex, pidbox, pubmed, ror")
77                .default_value("commonmeta"),
78        )
79        .arg(
80            Arg::new("number")
81                .long("number")
82                .help("Number of records to fetch via API (file and date inputs always import all)")
83                .value_parser(clap::value_parser!(usize))
84                .default_value("0"),
85        )
86        .arg(
87            Arg::new("page")
88                .long("page")
89                .help("Page number for API fetches (1-based)")
90                .value_parser(clap::value_parser!(usize))
91                .default_value("1"),
92        )
93        .arg(Arg::new("member").long("member").help("Crossref member ID"))
94        .arg(Arg::new("client").long("client").help("DataCite client ID"))
95        .arg(Arg::new("type").long("type").help("Work type filter"))
96        .arg(Arg::new("year").long("year").help("Publication year"))
97        .arg(Arg::new("language").long("language").help("Language filter"))
98        .arg(Arg::new("orcid").long("orcid").help("Filter by ORCID"))
99        .arg(Arg::new("ror").long("ror").help("Filter by ROR"))
100        .arg(Arg::new("affiliation").long("affiliation").help("Affiliation name filter"))
101        .arg(Arg::new("country").long("country").help("Country code filter"))
102        .arg(Arg::new("date-updated").long("date-updated").help("Filter by date updated (YYYY-MM-DD)"))
103        .arg(Arg::new("from-host").long("from-host").help("InvenioRDM source host"))
104        .arg(Arg::new("from-token").long("from-token").help("InvenioRDM source API token"))
105        .arg(Arg::new("community").long("community").help("InvenioRDM community slug"))
106        .arg(Arg::new("subject").long("subject").help("Subject area filter"))
107        .arg(Arg::new("depositor").long("depositor").help("Crossref depositor name"))
108        .arg(Arg::new("registrant").long("registrant").help("Crossref registrant name"))
109        .arg(
110            Arg::new("email")
111                .long("email")
112                .help("Email for OpenAlex mailto parameter"),
113        )
114        .arg(
115            Arg::new("sample")
116                .long("sample")
117                .help(
118                    "Return a random sample via API (crossref: max 100, openalex: max 200). \
119                    Crossref --s3: processes first 1000 records from cached TAR (cache must exist). \
120                    DataCite annual URL: streams TAR and stops after 1000 records. \
121                    --from datacite (no URL): uses DataCite API random sample (max 1000) instead of the cached TAR. \
122                    ORCID bulk import: stops after 1000 people; with a URL and no cache, \
123                    streams directly without downloading the full ~46 GB file.",
124                )
125                .action(ArgAction::SetTrue),
126        )
127        .arg(
128            Arg::new("has-orcid")
129                .long("has-orcid")
130                .help("Filter for records with ORCID")
131                .action(ArgAction::SetTrue),
132        )
133        .arg(
134            Arg::new("has-ror-id")
135                .long("has-ror-id")
136                .help("Filter for records with ROR")
137                .action(ArgAction::SetTrue),
138        )
139        .arg(
140            Arg::new("has-references")
141                .long("has-references")
142                .help("Filter for records with references")
143                .action(ArgAction::SetTrue),
144        )
145        .arg(
146            Arg::new("has-relation")
147                .long("has-relation")
148                .help("Filter for records with relation")
149                .action(ArgAction::SetTrue),
150        )
151        .arg(
152            Arg::new("has-abstract")
153                .long("has-abstract")
154                .help("Filter for records with abstract")
155                .action(ArgAction::SetTrue),
156        )
157        .arg(
158            Arg::new("has-award")
159                .long("has-award")
160                .help("Filter for records with award")
161                .action(ArgAction::SetTrue),
162        )
163        .arg(
164            Arg::new("has-license")
165                .long("has-license")
166                .help("Filter for records with license")
167                .action(ArgAction::SetTrue),
168        )
169        .arg(
170            Arg::new("has-archive")
171                .long("has-archive")
172                .help("Filter for records with archive")
173                .action(ArgAction::SetTrue),
174        )
175        .arg(
176            Arg::new("is-archived")
177                .long("is-archived")
178                .help("Filter for archived records")
179                .action(ArgAction::SetTrue),
180        )
181        .arg(
182            Arg::new("vocabulary")
183                .long("vocabulary")
184                .help("Output as vocabulary (e.g. InvenioRDM affiliations YAML)")
185                .action(ArgAction::SetTrue),
186        )
187        .arg(
188            Arg::new("match")
189                .long("match")
190                .help("Enable ROR affiliation matching when reading crossref and datacite records")
191                .default_value("true")
192                .value_parser(clap::value_parser!(bool)),
193        )
194        .arg(Arg::new("date").long("date").help(
195            "Date (YYYY-MM-DD) of a VRAIX daily dump; downloads \
196            {from}-{date}.sqlite3.zst from metadata.vraix.org when no input \
197            file path is given",
198        ))
199        .arg(
200            Arg::new("s3")
201                .long("s3")
202                .help(
203                    "Crossref annual: download from the Crossref S3 bucket \
204                    (s3://api-snapshots-reqpays-crossref). \
205                    Requires the AWS CLI to be installed and configured with credentials; \
206                    the requester pays bandwidth costs (~$18 for the full 2025 file). \
207                    With --sample, processes the first 1000 records from the cached TAR \
208                    (cache must exist; run without --sample first to download).",
209                )
210                .action(ArgAction::SetTrue),
211        )
212        .arg(
213            Arg::new("no-network")
214                .long("no-network")
215                .help("Disable all outbound network requests; only local .sqlite3 file imports are allowed")
216                .action(ArgAction::SetTrue),
217        )
218        .arg(
219            Arg::new("cache-db")
220                .long("cache-db")
221                .help(
222                    "Path to the cache SQLite database \
223                    (default: $CACHE_DB env var, or /var/lib/commonmeta/cache.sqlite3 on Linux). \
224                    Only used with --from cache.",
225                ),
226        )
227        .arg(
228            Arg::new("pubmed-csv")
229                .long("pubmed-csv")
230                .help(
231                    "Path to a local PMC-ids.csv.gz file. \
232                    Defaults to ~/Downloads/PMC-ids.csv.gz, then the platform cache. \
233                    Only used with --from pubmed.",
234                ),
235        )
236        .arg(
237            Arg::new("list-releases")
238                .long("list-releases")
239                .help(
240                    "Print the latest ORCID Public Data File release info and exit \
241                    (requires api.figshare.com access). Use this on a local machine \
242                    to obtain the summaries URL for servers where api.figshare.com \
243                    is blocked. Only used with --from orcid.",
244                )
245                .action(clap::ArgAction::SetTrue),
246        )
247        .arg(
248            Arg::new("people-db")
249                .long("people-db")
250                .help(
251                    "Path to the SQLite database for the 'people' table \
252                    (ORCID records). Defaults to the same path as the main database.",
253                ),
254        )
255        .arg(
256            Arg::new("force")
257                .long("force")
258                .action(ArgAction::SetTrue)
259                .help("Re-import even when the installed version is already up to date (ror, geonames)"),
260        )
261}
262
263pub fn execute(matches: &ArgMatches) -> Result<(), String> {
264    let input_path = matches.get_one::<String>("input").map(String::as_str);
265    let date = matches.get_one::<String>("date").map(String::as_str);
266    let no_network = matches.get_flag("no-network");
267    let force = matches.get_flag("force");
268
269    // DataCite annual data file: the positional input is the time-limited download URL
270    // (obtained by submitting email at https://datafiles.datacite.org/datafiles/public-2025).
271    let is_datacite_annual = input_path
272        .and_then(|p| url::Url::parse(p).ok())
273        .and_then(|u| u.host_str().map(|h| h == DATACITE_ANNUAL_HOST))
274        .unwrap_or(false);
275    if is_datacite_annual {
276        let sample = matches.get_flag("sample");
277        return import_datacite_annual(input_path.unwrap(), sample, no_network);
278    }
279
280    // Auto-detect source from VRAIX filename pattern ({source}-{date}.sqlite3).
281    let is_sqlite_input = input_path
282        .map(|p| io_utils::get_extension(p, ".json").1 == ".sqlite3")
283        .unwrap_or(false);
284    let filename_source: Option<&'static str> = if is_sqlite_input {
285        input_path
286            .and_then(|p| std::path::Path::new(p).file_stem()?.to_str())
287            .and_then(|stem| {
288                if stem.starts_with("crossref-") { Some("crossref") }
289                else if stem.starts_with("datacite-") { Some("datacite") }
290                else { None }
291            })
292    } else {
293        None
294    };
295    let from_explicit = matches.get_one::<String>("from").map(String::as_str).unwrap_or("commonmeta");
296    let from_flag: &str = filename_source.unwrap_or(from_explicit);
297
298    // When a source-specific flag is provided but --from is not set, auto-select
299    // the matching source (e.g. `import --member 78` implies --from crossref).
300    let from_flag = if from_flag == "commonmeta" && input_path.is_none() {
301        let has_member = matches.get_one::<String>("member").map(|s| !s.is_empty()).unwrap_or(false);
302        let has_client = matches.get_one::<String>("client").map(|s| !s.is_empty()).unwrap_or(false);
303        if has_member { "crossref" }
304        else if has_client { "datacite" }
305        else { from_flag }
306    } else {
307        from_flag
308    };
309
310    // ORCID: detect via validate_id, which accepts bare iDs (0000-0003-1419-2405)
311    // and full URLs. Must run before the ROR / DOI detection below.
312    let input_is_orcid = input_path
313        .map(|p| crate::utils::validate_id(p).1 == "ORCID")
314        .unwrap_or(false);
315    if input_is_orcid || from_flag == "orcid" {
316        let works_db = resolve_db_path(None);
317        let people_db_arg = matches.get_one::<String>("people-db").map(String::as_str).unwrap_or("");
318        let people_db = if people_db_arg.is_empty() { works_db.clone() } else { people_db_arg.to_string() };
319
320        // --list-releases: print release info without downloading (run on a machine
321        // where api.figshare.com is reachable, then copy the SUMMARIES URL to the server).
322        if matches.get_flag("list-releases") {
323            let release = crate::fetch_latest_orcid_release().map_err(|e| e.to_string())?;
324            println!("ORCID Public Data File — latest release");
325            println!("  Year/batch : {}_{}", release.year, release.batch);
326            println!("  Filename   : {}", release.filename);
327            println!("  Size       : {:.1} GB compressed", release.size_bytes as f64 / 1_073_741_824.0);
328            println!("  SUMMARIES  : {}", release.download_url);
329            println!();
330            println!("To import on a server where api.figshare.com is blocked:");
331            println!("  commonmeta import --from orcid \"{}\"", release.download_url);
332            return Ok(());
333        }
334
335        match input_path {
336            Some(raw) if input_is_orcid => {
337                // ORCID iD or URL: fetch single person record from the API.
338                if no_network {
339                    return Err("--no-network: ORCID API fetch requires network access; remove --no-network".to_string());
340                }
341                return import_orcid_person(raw, &people_db, &works_db);
342            }
343            url_arg => {
344                // No identifier, or a non-ORCID URL: import ORCID Public Data File.
345                // A URL passed as positional arg bypasses the figshare API discovery
346                // (useful when api.figshare.com is blocked on the host).
347                let url_override = url_arg.filter(|u| u.starts_with("http") || std::path::Path::new(u).exists());
348                let sample = matches.get_flag("sample");
349                let count = crate::import_orcid_public_data(
350                    std::path::Path::new(&people_db),
351                    url_override,
352                    no_network,
353                    sample,
354                )
355                .map_err(|e| e.to_string())?;
356                if count > 0 {
357                    println!("imported {} people from ORCID Public Data File into {}", count, people_db);
358                }
359                return Ok(());
360            }
361        }
362    }
363
364    // ROR URL as positional argument: fetch works by ROR from Crossref + DataCite
365    // and upsert into the works table. A bare `import ror` (no URL) is a vocabulary
366    // install and is handled further below.
367    let input_is_ror = input_path
368        .map(|p| crate::utils::validate_id(p).1 == "ROR")
369        .unwrap_or(false);
370    if input_is_ror {
371        if no_network {
372            return Err("--no-network: ROR works fetch requires network access; remove --no-network".to_string());
373        }
374        let raw = input_path.unwrap();
375        let out_path = resolve_db_path(None);
376        let number = *matches.get_one::<usize>("number").unwrap_or(&0);
377        let page = *matches.get_one::<usize>("page").unwrap_or(&1);
378        return import_ror_works(raw, &out_path, number, page);
379    }
380
381    // Positional shorthand: `import ror` / `import pidbox` / `import crossref` etc.
382    // is treated as `import --from <source>` when --from was not explicitly given
383    // and the positional arg is a known source name rather than a DOI or file path.
384    let (from, input_path) = match input_path {
385        Some(s)
386            if from_flag == "commonmeta"
387                && !is_sqlite_input
388                && matches!(s, "cache" | "crossref" | "geonames" | "openalex" | "pidbox" | "prefixes" | "pubmed" | "ror") =>
389        {
390            (s, None)
391        }
392        _ => (from_flag, input_path),
393    };
394
395    if !matches!(from, "cache" | "crossref" | "datacite" | "geonames" | "openalex" | "pidbox" | "prefixes" | "pubmed" | "ror" | "commonmeta") {
396        return Err(format!(
397            "import: unsupported --from value '{}' (supported: cache, crossref, datacite, geonames, openalex, pidbox, prefixes, pubmed, ror)",
398            from
399        ));
400    }
401
402    // When --no-network is set, only a local VRAIX .sqlite3 file is accepted
403    // for non-pubmed sources. For pubmed, resolve_pmc_ids_path handles the
404    // no-network check internally (and checks ~/Downloads/ before rejecting).
405    if no_network && !(is_sqlite_input && input_path.is_some()) && from != "pubmed" {
406        return Err(
407            "--no-network requires a local .sqlite3 input file; \
408            provide a VRAIX dump path or remove --no-network"
409                .to_string(),
410        );
411    }
412
413    // ROR is a vocabulary install, not a metadata records import.
414    if from == "ror" {
415        let out_path = resolve_db_path(None);
416        return install_ror(&out_path, force);
417    }
418
419    // prefixes: resolve all distinct DOI prefixes in the works table against the DOI RA API.
420    if from == "prefixes" {
421        let db_path = resolve_db_path(matches.get_one::<String>("file"));
422        if !std::path::Path::new(&db_path).exists() {
423            return Err(format!("database not found: {}", db_path));
424        }
425        eprintln!("import: resolving DOI prefixes from {}", db_path);
426        let start = std::time::Instant::now();
427        let n = crate::import_prefixes(std::path::Path::new(&db_path))
428            .map_err(|e| e.to_string())?;
429        eprintln!("import: resolved {} prefix(es) in {:.2?}", n, start.elapsed());
430        return Ok(());
431    }
432
433    // pidbox is a full VRAIX dump installed directly into commonmeta.sqlite3.
434    if from == "pidbox" {
435        let out_path = resolve_db_path(None);
436        return install_pidbox(&out_path);
437    }
438
439    // GeoNames: download cities500 dump and write to geonames table.
440    if from == "geonames" {
441        let out_path = resolve_db_path(None);
442        return install_geonames(&out_path, force);
443    }
444
445    // Cache import: reads from the cache SQLite
446    // (same VRAIX schema) and flushes the cache after a successful import.
447    if from == "cache" {
448        let out_path = resolve_db_path(None);
449        let cache_path = resolve_cache_db_path(matches.get_one::<String>("cache-db"));
450        let people_db_arg = matches.get_one::<String>("people-db").map(String::as_str).unwrap_or("");
451        let people_path = if people_db_arg.is_empty() { out_path.clone() } else { people_db_arg.to_string() };
452        return import_cache(&cache_path, &out_path, &people_path);
453    }
454
455    // PubMed: three sub-paths depending on what the caller provides.
456    if from == "pubmed" {
457        let out_path = resolve_db_path(None);
458        let number = *matches.get_one::<usize>("number").unwrap_or(&0);
459
460        // 1. Single identifier (not a CSV file) → fetch via Europe PMC REST API.
461        if let Some(identifier) = input_path {
462            let looks_like_csv = identifier.ends_with(".csv.gz") || identifier.ends_with(".csv");
463            if !looks_like_csv {
464                if no_network {
465                    return Err("--no-network: Europe PMC API fetch requires network access; remove --no-network".to_string());
466                }
467                return import_single(identifier, "pubmed", &out_path);
468            }
469        }
470
471        // 2. --number N > 0 with no file path → API batch via fetch_list_from_api.
472        //    Fall through to the generic API fetch path at the bottom.
473        if number > 0 {
474            if no_network {
475                return Err("--no-network: Europe PMC API fetch requires network access; remove --no-network".to_string());
476            }
477            // handled below by the generic fetch_list_from_api path
478        } else {
479            // 3. Bulk PMC-ids.csv.gz upsert.  resolve_pmc_ids_path handles no_network
480            //    internally (checks ~/Downloads/ before rejecting).
481            let explicit_csv = matches.get_one::<String>("pubmed-csv").map(String::as_str);
482            return import_pubmed(explicit_csv, &out_path, no_network);
483        }
484    }
485
486    let out_path = resolve_db_path(None);
487    let is_vraix_sqlite = is_sqlite_input && matches!(from, "crossref" | "datacite");
488    let is_date_download = date.is_some() && input_path.is_none() && matches!(from, "crossref" | "datacite");
489
490    // Fast path: stream VRAIX SQLite → commonmeta SQLite without loading all
491    // records into RAM. Always imports every row (limit=0). Always upserts.
492    if is_vraix_sqlite || is_date_download {
493        return import_vraix_fast(from, input_path, date, &out_path);
494    }
495
496    // Annual S3 path: `import --from crossref` with no date, no input file,
497    // and no API-specific filters (--sample included). With --s3 downloads the
498    // full Crossref corpus from the requester-pays S3 bucket. Without --s3 and
499    // without --sample, suggest --s3 or add a filter to use the API.
500    let has_api_filters = matches.get_one::<String>("member").map(|s| !s.is_empty()).unwrap_or(false)
501        || matches.get_one::<String>("client").map(|s| !s.is_empty()).unwrap_or(false)
502        || matches.get_one::<String>("orcid").map(|s| !s.is_empty()).unwrap_or(false)
503        || matches.get_one::<String>("ror").map(|s| !s.is_empty()).unwrap_or(false)
504        || *matches.get_one::<usize>("number").unwrap_or(&0) > 0
505        || matches.get_flag("sample");
506    let s3 = matches.get_flag("s3");
507    let is_annual_crossref = matches!(from, "crossref")
508        && input_path.is_none()
509        && date.is_none()
510        && !has_api_filters;
511    if is_annual_crossref {
512        if s3 {
513            return import_crossref_s3(false, no_network);
514        }
515        return Err(
516            "import: 'commonmeta import --from crossref' requires a filter or mode:\n  \
517             --sample           fetch up to 100 random records via the Crossref API\n  \
518             --number N         fetch N records via the Crossref API\n  \
519             --member ID        filter by member (implies API fetch)\n  \
520             --s3               download the full annual corpus from S3 (~$18)".to_string()
521        );
522    }
523
524    // Annual DataCite path: `import --from datacite` (or `import datacite`) with no
525    // date and no API filters.  Uses the cached TAR if present; otherwise the user
526    // must supply the download URL as the positional argument (handled earlier via
527    // `is_datacite_annual`).
528    let cache_path = io_utils::cache_dir("datacite").join("public-2025.tar");
529    let is_datacite_annual_cmd = from == "datacite"
530        && input_path.is_none()
531        && date.is_none()
532        && !has_api_filters
533        && !matches.get_flag("sample");
534    if is_datacite_annual_cmd {
535        if !cache_path.exists() || cache_path.metadata().map(|m| m.len()).unwrap_or(0) == 0 {
536            return Err(format!(
537                "import: no cached DataCite 2025 TAR found at {}.\n\
538                 Obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025\n\
539                 and run: commonmeta import \"<URL>\"",
540                cache_path.display()
541            ));
542        }
543        let sample = matches.get_flag("sample");
544        return import_datacite_annual("", sample, no_network);
545    }
546
547    // Commonmeta sqlite → commonmeta sqlite (merge/upsert).
548    if is_sqlite_input && from == "commonmeta" {
549        if let Some(src) = input_path {
550            return import_commonmeta_sqlite(src, &out_path);
551        }
552    }
553
554    // Single-record path: DOI, URL, or any identifier that isn't a file path.
555    // Auto-detect the source format from the identifier when --from is not given.
556    if let Some(identifier) = input_path {
557        if !is_sqlite_input {
558            let effective_from = if from_explicit == "commonmeta" {
559                let ra = detect_format(identifier, no_network);
560                match ra_to_reader(&ra) {
561                    Some(reader) => reader.to_string(),
562                    None => return Err(format!(
563                        "import: no reader available for DOI registration agency '{}' \
564                        (use --from to specify a supported format)",
565                        ra
566                    )),
567                }
568            } else {
569                from_explicit.to_string()
570            };
571            return import_single(identifier, &effective_from, &out_path);
572        }
573    }
574
575    // API fetch path: fetch records, then upsert into commonmeta SQLite.
576    if from == "commonmeta" {
577        return Err(
578            "import: --from commonmeta requires an input .sqlite3 file path".to_string()
579        );
580    }
581    let fetch_start = Instant::now();
582    let data = fetch_list_from_api(matches, from)?;
583    eprintln!(
584        "import: fetch took {:.2?} ({} records)",
585        fetch_start.elapsed(),
586        data.len()
587    );
588
589    let out_sqlite = Path::new(&out_path);
590    let write_start = Instant::now();
591    crate::upsert_sqlite(&data, out_sqlite).map_err(|e| e.to_string())?;
592    let total = crate::count_sqlite_works(out_sqlite).ok();
593    eprintln!(
594        "import: upsert took {:.2?} ({} records)",
595        write_start.elapsed(),
596        data.len()
597    );
598    println!("{}", fmt_wrote_sqlite(&out_path, data.len(), total));
599    Ok(())
600}
601
602/// Import the DataCite annual public data file into the local database.
603///
604/// The download is a plain TAR archive containing `.jsonl.gz` files, each holding
605/// up to 10,000 DataCite records.  Some entries are empty placeholders (valid gzip
606/// of 0 bytes) for date partitions with no records — those are silently skipped.
607///
608/// The TAR is cached at `~/Library/Caches/commonmeta/datacite/public-2025.tar` so
609/// re-parses don't require a new token.  `url` (with its 24 h JWT token) is only
610/// needed when the cache is absent.
611///
612/// When `sample` is true, processing stops once DATACITE_SAMPLE_LINES records
613/// have been accumulated (skipping empty entries until data is found).
614fn import_datacite_annual(url: &str, sample: bool, no_network: bool) -> Result<(), String> {
615    use std::io::BufReader;
616    use tar::Archive;
617
618    const DATACITE_SAMPLE_LINES: usize = 1_000;
619    let limit = if sample { DATACITE_SAMPLE_LINES } else { usize::MAX };
620    let cache_path = io_utils::cache_dir("datacite").join("public-2025.tar");
621    let cached = cache_path.exists() && cache_path.metadata().map(|m| m.len()).unwrap_or(0) > 0;
622
623    // --sample without cache: stream directly from URL without downloading 33 GB to disk.
624    if sample && !cached {
625        if url.is_empty() {
626            return Err(format!(
627                "import: no cached DataCite 2025 TAR at {}; \
628                 obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025",
629                cache_path.display()
630            ));
631        }
632        if no_network {
633            return Err("--no-network: cannot stream DataCite annual data file".to_string());
634        }
635        eprintln!("import: streaming DataCite 2025 (first {} records, no cache)", limit);
636        let client = reqwest::blocking::Client::builder()
637            .user_agent(io_utils::commonmeta_user_agent())
638            .timeout(std::time::Duration::from_secs(24 * 60 * 60))
639            .build()
640            .map_err(|e| format!("HTTP client error: {}", e))?;
641        let resp = client.get(url).send()
642            .and_then(|r| r.error_for_status())
643            .map_err(|e| format!("download failed: {}", e))?;
644        return process_datacite_archive(Archive::new(BufReader::new(resp)), limit, true);
645    }
646
647    // Full import (or --sample with existing cache): ensure TAR is on disk.
648    if !cached {
649        if url.is_empty() {
650            return Err(format!(
651                "import: no cached DataCite 2025 TAR at {}; \
652                 obtain a download URL from https://datafiles.datacite.org/datafiles/public-2025",
653                cache_path.display()
654            ));
655        }
656        if no_network {
657            return Err(format!(
658                "--no-network: cached TAR not found at {}; provide a download URL to cache it first",
659                cache_path.display()
660            ));
661        }
662        eprintln!("import: downloading DataCite 2025 annual data file to {}", cache_path.display());
663        let client = reqwest::blocking::Client::builder()
664            .user_agent(io_utils::commonmeta_user_agent())
665            .timeout(std::time::Duration::from_secs(24 * 60 * 60))
666            .build()
667            .map_err(|e| format!("HTTP client error: {}", e))?;
668        let mut resp = client.get(url).send()
669            .and_then(|r| r.error_for_status())
670            .map_err(|e| format!("download failed: {}", e))?;
671        if let Some(parent) = cache_path.parent() {
672            std::fs::create_dir_all(parent).map_err(|e| format!("mkdir: {}", e))?;
673        }
674        let mut file = std::fs::File::create(&cache_path)
675            .map_err(|e| format!("create cache file: {}", e))?;
676        let bytes = std::io::copy(&mut resp, &mut file)
677            .map_err(|e| format!("download write: {}", e))?;
678        eprintln!("import: cached {} GB at {}", bytes / 1_073_741_824, cache_path.display());
679    } else {
680        eprintln!("import: using cached DataCite 2025 TAR at {}", cache_path.display());
681    }
682
683    eprintln!(
684        "import: processing DataCite 2025 annual data file{}",
685        if sample { format!(" (first {} records)", limit) } else { String::new() },
686    );
687    let file = std::fs::File::open(&cache_path)
688        .map_err(|e| format!("open cache: {}", e))?;
689    process_datacite_archive(Archive::new(file), limit, sample)
690}
691
692fn process_datacite_archive<R: std::io::Read>(mut archive: tar::Archive<R>, limit: usize, is_sample: bool) -> Result<(), String> {
693    use flate2::read::GzDecoder;
694    use std::io::{BufRead, BufReader, Read as _, Cursor};
695    use rayon::prelude::*;
696
697    let out_path = resolve_db_path(None);
698    let out_sqlite = Path::new(&out_path);
699    let mut total_records = 0usize;
700    let start = Instant::now();
701    let mut file_count = 0usize;
702
703    'entries: for entry_result in archive.entries().map_err(|e| format!("TAR read: {}", e))? {
704        let mut entry = match entry_result {
705            Ok(e) => e,
706            Err(e) => {
707                // A truncated TAR (incomplete download) produces "unexpected EOF"
708                // when the iterator tries to skip past a corrupt entry's data.
709                // Treat this as a clean end-of-archive rather than a hard error
710                // so that the records already written to SQLite are reported.
711                eprintln!("import: TAR truncated after {} files ({} records) — {}", file_count, total_records, e);
712                break 'entries;
713            }
714        };
715        let name = entry.path()
716            .map(|p| p.to_string_lossy().into_owned())
717            .unwrap_or_default();
718        if !name.ends_with(".jsonl.gz") {
719            continue;
720        }
721
722        let mut compressed: Vec<u8> = Vec::new();
723        if let Err(e) = entry.read_to_end(&mut compressed) {
724            eprintln!("import: read error in {}: {}", name, e);
725            break 'entries;
726        }
727
728        let mut decompressed: Vec<u8> = Vec::new();
729        if let Err(e) = GzDecoder::new(Cursor::new(&compressed)).read_to_end(&mut decompressed) {
730            eprintln!("import: decompress error in {}: {}", name, e);
731            continue;
732        }
733        if decompressed.is_empty() {
734            continue;
735        }
736        file_count += 1;
737
738        let take = limit.saturating_sub(total_records);
739        // JSONL records in the annual file are the `data` object without the
740        // outer {"data":...} wrapper that the DataCite API response and reader expect.
741        let lines: Vec<String> = BufReader::new(Cursor::new(decompressed))
742            .lines()
743            .filter_map(|l| l.ok())
744            .map(|l| l.trim().to_string())
745            .filter(|l| !l.is_empty())
746            .take(take)
747            .collect();
748
749        if lines.is_empty() {
750            continue;
751        }
752
753        let batch: Vec<crate::Data> = lines
754            .par_iter()
755            .filter_map(|trimmed| {
756                let input = format!("{{\"data\":{}}}", trimmed);
757                crate::read("datacite", &input).ok()
758            })
759            .collect();
760
761        let n = batch.len();
762        if n > 0 {
763            crate::upsert_sqlite(&batch, out_sqlite)
764                .map_err(|e| format!("upsert failed after {}: {}", name, e))?;
765            total_records += n;
766        }
767        eprintln!("import: {} — {} records ({} total in {:.0?})", name, n, total_records, start.elapsed());
768
769        if total_records >= limit {
770            break 'entries;
771        }
772    }
773
774    let db_total = crate::count_sqlite_works(out_sqlite).ok();
775    eprintln!("import: {} files, {} records in {:.2?}", file_count, total_records, start.elapsed());
776    if !is_sample {
777        let _ = crate::set_sqlite_setting(out_sqlite, "datacite_annual_date", "2025");
778    }
779    eprintln!("import: rebuilding FTS index…");
780    crate::rebuild_works_fts(out_sqlite).map_err(|e| format!("FTS rebuild: {e}"))?;
781    println!("{}", fmt_wrote_sqlite(&out_path, total_records, db_total));
782    Ok(())
783}
784
785const CROSSREF_S3_BUCKET: &str = "api-snapshots-reqpays-crossref";
786const CROSSREF_S3_KEY: &str = "March_2026_Public_Data_File_from_Crossref.tar";
787
788/// Download the Crossref annual public data file from the S3 requester-pays bucket
789/// and import it. Requires the `aws` CLI configured with valid AWS credentials;
790/// the requester pays bandwidth costs (~$18 for the ~208 GB 2026 file).
791///
792/// The TAR is cached at `~/.cache/commonmeta/crossref/crossref-annual-s3.tar`.
793/// When `sample` is true and the cache exists, processing stops after 1 000 records.
794/// When `sample` is true and no cache exists, an error is returned — use the torrent
795/// path (`commonmeta import --from crossref --sample`) for quick smoke-tests.
796fn import_crossref_s3(sample: bool, no_network: bool) -> Result<(), String> {
797    const CROSSREF_S3_SAMPLE_LINES: usize = 1_000;
798    let limit = if sample { CROSSREF_S3_SAMPLE_LINES } else { usize::MAX };
799    let cache_path = io_utils::cache_dir("crossref").join("crossref-annual-s3.tar");
800    let cached = cache_path.exists() && cache_path.metadata().map(|m| m.len()).unwrap_or(0) > 0;
801
802    if sample && !cached {
803        return Err(format!(
804            "import: --s3 --sample requires the cached TAR at {}.\n\
805             Run without --sample first to download the full archive, then use --sample \
806             to test against the cached data.\n\
807             For a quick smoke-test without downloading, use:\n\
808             commonmeta import --from crossref --sample   (torrent, first 5 files)",
809            cache_path.display()
810        ));
811    }
812
813    if !cached {
814        if no_network {
815            return Err(format!(
816                "--no-network: no cached Crossref S3 TAR at {}",
817                cache_path.display()
818            ));
819        }
820
821        // Verify aws CLI is available.
822        if std::process::Command::new("aws").arg("--version").output().is_err() {
823            return Err(
824                "import: aws CLI not found — install it from https://aws.amazon.com/cli/ \
825                and configure credentials before using --s3"
826                    .to_string(),
827            );
828        }
829
830        let key = std::env::var("CROSSREF_S3_KEY")
831            .unwrap_or_else(|_| CROSSREF_S3_KEY.to_string());
832
833        if let Some(parent) = cache_path.parent() {
834            std::fs::create_dir_all(parent).map_err(|e| format!("mkdir: {}", e))?;
835        }
836        eprintln!(
837            "import: downloading s3://{}/{} (~208 GB) to {}\n\
838             Note: this is a requester-pays bucket; bandwidth costs (~$18) are charged to your AWS account.",
839            CROSSREF_S3_BUCKET, key, cache_path.display()
840        );
841        let status = std::process::Command::new("aws")
842            .args([
843                "s3", "cp",
844                "--request-payer", "requester",
845                &format!("s3://{}/{}", CROSSREF_S3_BUCKET, key),
846                &cache_path.to_string_lossy(),
847            ])
848            .status()
849            .map_err(|e| format!("aws s3 cp failed: {}", e))?;
850        if !status.success() {
851            return Err(
852                "aws s3 cp failed — check AWS credentials, IAM permissions, \
853                and that you accept requester-pays charges for this bucket"
854                    .to_string(),
855            );
856        }
857        eprintln!("import: cached Crossref annual TAR at {}", cache_path.display());
858    } else {
859        eprintln!("import: using cached Crossref annual TAR at {}", cache_path.display());
860    }
861
862    eprintln!(
863        "import: processing Crossref annual data file{}",
864        if sample { format!(" (first {} records)", limit) } else { String::new() },
865    );
866    let file = std::fs::File::open(&cache_path)
867        .map_err(|e| format!("open cache: {}", e))?;
868    process_crossref_s3_archive(tar::Archive::new(file), limit, sample)
869}
870
871fn process_crossref_s3_archive<R: std::io::Read>(mut archive: tar::Archive<R>, limit: usize, is_sample: bool) -> Result<(), String> {
872    use flate2::read::GzDecoder;
873    use std::io::{BufRead, BufReader, Read as _, Cursor};
874    use rayon::prelude::*;
875
876    let out_path = resolve_db_path(None);
877    let out_sqlite = Path::new(&out_path);
878    let mut total_records = 0usize;
879    let start = Instant::now();
880    let mut file_count = 0usize;
881
882    'entries: for entry_result in archive.entries().map_err(|e| format!("TAR read: {}", e))? {
883        let mut entry = match entry_result {
884            Ok(e) => e,
885            Err(e) => {
886                eprintln!("import: TAR truncated after {} files ({} records) — {}", file_count, total_records, e);
887                break 'entries;
888            }
889        };
890        let name = entry.path()
891            .map(|p| p.to_string_lossy().into_owned())
892            .unwrap_or_default();
893
894        // Support both compressed (.jsonl.gz) and plain (.jsonl / .json) entries.
895        let is_gz = name.ends_with(".jsonl.gz") || name.ends_with(".json.gz");
896        let is_jsonl = name.ends_with(".jsonl") || name.ends_with(".json");
897        if !is_gz && !is_jsonl {
898            continue;
899        }
900
901        let mut raw: Vec<u8> = Vec::new();
902        if let Err(e) = entry.read_to_end(&mut raw) {
903            eprintln!("import: read error in {}: {}", name, e);
904            break 'entries;
905        }
906        if raw.is_empty() {
907            continue;
908        }
909
910        let decompressed: Vec<u8> = if is_gz {
911            let mut buf = Vec::new();
912            if let Err(e) = GzDecoder::new(Cursor::new(&raw)).read_to_end(&mut buf) {
913                eprintln!("import: decompress error in {}: {}", name, e);
914                continue;
915            }
916            buf
917        } else {
918            raw
919        };
920        if decompressed.is_empty() {
921            continue;
922        }
923        file_count += 1;
924
925        let take = limit.saturating_sub(total_records);
926        let lines: Vec<String> = BufReader::new(Cursor::new(decompressed))
927            .lines()
928            .filter_map(|l| l.ok())
929            .map(|l| l.trim().to_string())
930            .filter(|l| !l.is_empty())
931            .take(take)
932            .collect();
933
934        if lines.is_empty() {
935            continue;
936        }
937
938        let batch: Vec<crate::Data> = lines
939            .par_iter()
940            .filter_map(|trimmed| {
941                let input = format!(r#"{{"message":{}}}"#, trimmed);
942                crate::read("crossref", &input).ok()
943            })
944            .collect();
945
946        let n = batch.len();
947        if n > 0 {
948            crate::upsert_sqlite(&batch, out_sqlite)
949                .map_err(|e| format!("upsert failed after {}: {}", name, e))?;
950            total_records += n;
951        }
952        eprintln!("import: {} — {} records ({} total in {:.0?})", name, n, total_records, start.elapsed());
953
954        if total_records >= limit {
955            break 'entries;
956        }
957    }
958
959    let db_total = crate::count_sqlite_works(out_sqlite).ok();
960    eprintln!("import: {} files, {} records in {:.2?}", file_count, total_records, start.elapsed());
961    if !is_sample {
962        let _ = crate::set_sqlite_setting(out_sqlite, "crossref_annual_date", "2026-03");
963    }
964    eprintln!("import: rebuilding FTS index…");
965    crate::rebuild_works_fts(out_sqlite).map_err(|e| format!("FTS rebuild: {e}"))?;
966    println!("{}", fmt_wrote_sqlite(&out_path, total_records, db_total));
967    Ok(())
968}
969
970/// Copy all records from a commonmeta-format SQLite file into the local database.
971fn import_commonmeta_sqlite(src_path: &str, out_path: &str) -> Result<(), String> {
972    let total_start = Instant::now();
973    let src = Path::new(src_path);
974    let out = Path::new(out_path);
975
976    let read_start = Instant::now();
977    let data = crate::read_sqlite_commonmeta(src, None, 0)
978        .map_err(|e| format!("failed to read '{}': {}", src_path, e))?;
979    eprintln!("import: read {} records in {:.2?}", data.len(), read_start.elapsed());
980
981    let write_start = Instant::now();
982    crate::upsert_sqlite(&data, out).map_err(|e| e.to_string())?;
983    let total = crate::count_sqlite_works(out).ok();
984    eprintln!("import: upsert took {:.2?}", write_start.elapsed());
985    eprintln!("import: total {:.2?}", total_start.elapsed());
986    println!("{}", fmt_wrote_sqlite(out_path, data.len(), total));
987    Ok(())
988}
989
990/// Fetch a single record by DOI (or other identifier), upsert it and its
991/// referenced works into SQLite, then print a commonmeta JSON array
992/// ([work, ref1, ref2, …]) to stdout.
993fn import_single(identifier: &str, from: &str, out_path: &str) -> Result<(), String> {
994    let fetch_start = Instant::now();
995    let mut data = crate::read(from, identifier).map_err(|e| e.to_string())?;
996    eprintln!("import: fetch took {:.2?}", fetch_start.elapsed());
997
998    let out_sqlite = Path::new(out_path);
999
1000    // Fetch references (SQLite batch + network fallback).
1001    let ref_works = crate::fetch_reference_works(&data, Some(out_sqlite), false);
1002
1003    // Upsert unenriched data — citations are derived from the junction table,
1004    // not stored in the metadata blob.
1005    let write_start = Instant::now();
1006    let all: Vec<crate::Data> = std::iter::once(data.clone())
1007        .chain(ref_works.clone())
1008        .collect();
1009    crate::upsert_sqlite(&all, out_sqlite).map_err(|e| e.to_string())?;
1010    eprintln!("import: upsert took {:.2?} ({} records)", write_start.elapsed(), all.len());
1011
1012    // Enrich citations from junction table for output only (not persisted).
1013    crate::enrich_citations(&mut data, out_sqlite);
1014
1015    // Serialize and print commonmeta JSON array. References are stripped from
1016    // each item (second-order children not shown at this level).
1017    let mut main_prepared = crate::prepare_commonmeta(&data);
1018    main_prepared.references.clear();
1019    let mut items = vec![serde_json::to_value(main_prepared).map_err(|e| e.to_string())?];
1020    for work in &ref_works {
1021        let mut prepared = crate::prepare_commonmeta(work);
1022        prepared.references.clear();
1023        items.push(serde_json::to_value(prepared).map_err(|e| e.to_string())?);
1024    }
1025    let output = serde_json::to_vec_pretty(&items).map_err(|e| e.to_string())?;
1026    println!("{}", String::from_utf8_lossy(&output));
1027    Ok(())
1028}
1029
1030/// Import records from the cache SQLite database into the commonmeta works
1031/// database, then flush the cache.
1032///
1033/// The cache uses the VRAIX transport schema (`pid`, `source_id`,
1034/// `raw_metadata`) and may contain mixed crossref and datacite records; each
1035/// row is routed to the appropriate parser by its `source_id`.
1036///
1037/// The cache is flushed (all rows removed + VACUUM) only after a successful
1038/// upsert so that a failed import leaves the cache intact for the next run.
1039fn import_cache(cache_path: &str, out_path: &str, people_path: &str) -> Result<(), String> {
1040    let cache_sqlite = std::path::Path::new(cache_path);
1041    let out_sqlite = std::path::Path::new(out_path);
1042    let people_sqlite = std::path::Path::new(people_path);
1043
1044    if !cache_sqlite.exists() {
1045        return Err(format!(
1046            "import: cache not found at {}\n\
1047             Set CACHE_DB or use --cache-db to specify the path",
1048            cache_path
1049        ));
1050    }
1051
1052    eprintln!("import: reading from cache at {}", cache_path);
1053    let start = Instant::now();
1054
1055    let n = crate::stream_pidbox_to_sqlite(cache_sqlite, out_sqlite, 0, true)
1056        .map_err(|e| format!("import: cache stream failed: {}", e))?;
1057
1058    let total = crate::count_sqlite_works(out_sqlite).ok();
1059    eprintln!("import: imported {} work records in {:.2?}", n, start.elapsed());
1060
1061    let n_people = crate::stream_cache_orcid_to_people_sqlite(cache_sqlite, people_sqlite)
1062        .map_err(|e| format!("import: cache orcid stream failed: {}", e))?;
1063    if n_people > 0 {
1064        eprintln!("import: imported {} person records into {}", n_people, people_path);
1065    }
1066
1067    let deleted = crate::flush_dragoman_cache(cache_sqlite)
1068        .map_err(|e| format!("import: flush cache failed: {}", e))?;
1069    eprintln!("import: flushed {} rows from cache", deleted);
1070
1071    println!("{}", fmt_wrote_sqlite(out_path, n, total));
1072    Ok(())
1073}
1074
1075fn import_pubmed(
1076    explicit_csv: Option<&str>,
1077    out_path: &str,
1078    no_network: bool,
1079) -> Result<(), String> {
1080    let gz_path = crate::pubmed::resolve_pmc_ids_path(explicit_csv, no_network)
1081        .map_err(|e| format!("import: {e}"))?;
1082    let out_sqlite = std::path::Path::new(out_path);
1083
1084    eprintln!("import: reading PMC-ids from {}", gz_path.display());
1085    let start = Instant::now();
1086
1087    let n = crate::stream_pmc_ids_to_sqlite(&gz_path, out_sqlite, 0, no_network)
1088        .map_err(|e| format!("import: PMC-ids stream failed: {e}"))?;
1089
1090    let total = crate::count_sqlite_works(out_sqlite).ok();
1091    eprintln!("import: imported {} records in {:.2?}", n, start.elapsed());
1092    println!("{}", fmt_wrote_sqlite(out_path, n, total));
1093    Ok(())
1094}
1095
1096fn import_vraix_fast(
1097    from: &str,
1098    input_path: Option<&str>,
1099    date: Option<&str>,
1100    out_path: &str,
1101) -> Result<(), String> {
1102    let total_start = Instant::now();
1103    let out_sqlite = std::path::PathBuf::from(out_path);
1104
1105    // Resolve the VRAIX input to a local .sqlite3 path, downloading and
1106    // decompressing on demand when only --date was given.
1107    let (in_sqlite, tmp_to_clean) = if date.is_some() && input_path.is_none() {
1108        let date = date.unwrap();
1109        let url = format!("https://metadata.vraix.org/{}-{}.sqlite3.zst", from, date);
1110        let cache_key = format!("{}-{}.sqlite3.zst", from, date);
1111        let dl_start = Instant::now();
1112        let (cache_path, from_cache) =
1113            io_utils::ensure_cached_path(&url, "vraix", &cache_key, VRAIX_CACHE_TTL)
1114                .map_err(|e| format!("failed to download '{}': {}", url, e))?;
1115        let size = cache_path.metadata().map(|m| m.len()).unwrap_or(0);
1116        eprintln!(
1117            "import: download took {:.2?} ({} bytes{})",
1118            dl_start.elapsed(),
1119            size,
1120            if from_cache { ", from cache" } else { "" }
1121        );
1122        let dc_start = Instant::now();
1123        let tmp = out_sqlite.with_extension(format!("sqlite3.vraix-{}.tmp", std::process::id()));
1124        let dc_bytes = io_utils::decompress_zst_file(&cache_path, &tmp)
1125            .map_err(|e| format!("failed to decompress '{}': {}", url, e))?;
1126        eprintln!(
1127            "import: decompress took {:.2?} ({} bytes)",
1128            dc_start.elapsed(),
1129            dc_bytes
1130        );
1131        (tmp.clone(), Some(tmp))
1132    } else {
1133        (std::path::PathBuf::from(input_path.unwrap()), None)
1134    };
1135
1136    let convert_start = Instant::now();
1137    let result = crate::stream_vraix_to_sqlite(&in_sqlite, from, &out_sqlite, 0, true)
1138        .map_err(|e| e.to_string());
1139    if let Some(tmp) = tmp_to_clean {
1140        std::fs::remove_file(&tmp).ok();
1141    }
1142    let n = result?;
1143    let total = crate::count_sqlite_works(&out_sqlite).ok();
1144    eprintln!(
1145        "import: convert+write took {:.2?} ({} records)",
1146        convert_start.elapsed(),
1147        n
1148    );
1149    eprintln!("import: total took {:.2?}", total_start.elapsed());
1150    println!("{}", fmt_wrote_sqlite(out_path, n, total));
1151    Ok(())
1152}
1153
1154pub(crate) fn install_ror(out_path: &str, force: bool) -> Result<(), String> {
1155    let total = Instant::now();
1156
1157    eprintln!("Fetching latest ROR release metadata from Zenodo...");
1158    let t = Instant::now();
1159    let release = crate::fetch_latest_ror_release().map_err(|e| e.to_string())?;
1160    eprintln!("  metadata fetched in {:.2}s", t.elapsed().as_secs_f64());
1161
1162    let db_path = Path::new(out_path);
1163    match crate::fetch_installed_ror_version(db_path).map_err(|e| e.to_string())? {
1164        Some(ref installed) if installed == &release.version && !force => {
1165            println!(
1166                "ROR {} ({}) is already installed at {}",
1167                release.version, release.date, out_path
1168            );
1169            return Ok(());
1170        }
1171        Some(ref installed) if installed == &release.version => {
1172            eprintln!("Re-importing ROR {} ({}) (--force)...", installed, release.version);
1173        }
1174        Some(ref installed) => {
1175            eprintln!("Upgrading ROR {} → {}...", installed, release.version);
1176        }
1177        None => {}
1178    }
1179
1180    let t = Instant::now();
1181    let (list, from_cache) =
1182        crate::download_ror_release(&release).map_err(|e| e.to_string())?;
1183    eprintln!(
1184        "  {} and parsed {} organizations in {:.2}s",
1185        if from_cache { "loaded" } else { "downloaded" },
1186        list.len(),
1187        t.elapsed().as_secs_f64()
1188    );
1189
1190    eprintln!("Writing to {}...", out_path);
1191    let t = Instant::now();
1192    crate::write_ror_sqlite(&list, db_path, Some(&release.version), Some(&release.date))
1193        .map_err(|e| e.to_string())?;
1194    eprintln!("  SQLite written in {:.2}s", t.elapsed().as_secs_f64());
1195    eprintln!("  total: {:.2}s", total.elapsed().as_secs_f64());
1196
1197    println!(
1198        "Installed ROR {} ({}) → {} ({} organizations)",
1199        release.version,
1200        release.date,
1201        out_path,
1202        list.len(),
1203    );
1204    Ok(())
1205}
1206
1207pub(crate) fn install_geonames(out_path: &str, force: bool) -> Result<(), String> {
1208    let total = Instant::now();
1209    let db_path = Path::new(out_path);
1210
1211    let today = chrono::Utc::now().format("%Y-%m-%d").to_string();
1212    if !force {
1213        if let Ok(Some(ref installed)) = crate::fetch_installed_geonames_date(db_path) {
1214            if installed == &today {
1215                println!("GeoNames ({}) is already installed at {}", installed, out_path);
1216                return Ok(());
1217            }
1218        }
1219    } else if let Ok(Some(ref installed)) = crate::fetch_installed_geonames_date(db_path) {
1220        eprintln!("Re-importing GeoNames ({}) (--force)...", installed);
1221    }
1222
1223    eprintln!("Downloading GeoNames cities500 dump...");
1224    let t = Instant::now();
1225    let (count, from_cache) =
1226        crate::install_geonames_sqlite(db_path, Some(&today)).map_err(|e| e.to_string())?;
1227    eprintln!(
1228        "  {} and parsed {} places in {:.2}s",
1229        if from_cache { "loaded" } else { "downloaded" },
1230        count,
1231        t.elapsed().as_secs_f64()
1232    );
1233    eprintln!("  total: {:.2}s", total.elapsed().as_secs_f64());
1234
1235    println!(
1236        "Installed GeoNames ({}) → {} ({} places)",
1237        today, out_path, count,
1238    );
1239    Ok(())
1240}
1241
1242pub(crate) fn install_pidbox(out_path: &str) -> Result<(), String> {
1243    let total = Instant::now();
1244
1245    eprintln!("Downloading pidbox from {}...", PIDBOX_URL);
1246    let t = Instant::now();
1247    let (cache_path, from_cache) =
1248        io_utils::ensure_cached_path(PIDBOX_URL, "vraix", PIDBOX_CACHE_KEY, VRAIX_CACHE_TTL)
1249            .map_err(|e| format!("failed to download pidbox: {}", e))?;
1250    if from_cache {
1251        eprintln!("  pidbox download skipped (cached at {})", cache_path.display());
1252    } else {
1253        eprintln!("  downloaded in {:.2}s", t.elapsed().as_secs_f64());
1254    }
1255
1256    // The pidbox SQLite database is not VACUUM'd, so overflow pages for large
1257    // records appear in reverse page-number order.  stream_zst_pidbox_to_sqlite
1258    // uses a sliding window buffer (default 32 GiB RAM + 500 GiB disk) to
1259    // resolve backward chain links without extra full-file scans.
1260    // Tune with COMMONMETA_SCAN_WINDOW_GIB and COMMONMETA_SCAN_DISK_GIB.
1261    let out = Path::new(out_path);
1262    eprintln!("Converting (streaming decompress + convert) → {}…", out_path);
1263    let t = Instant::now();
1264    let n = crate::stream_zst_pidbox_to_sqlite(&cache_path, out, 0)
1265        .map_err(|e| format!("failed to convert pidbox: {}", e))?;
1266    eprintln!("  converted and wrote {} records in {:.0}s", n, t.elapsed().as_secs_f64());
1267    eprintln!("  total: {:.0}s", total.elapsed().as_secs_f64());
1268
1269    let date = crate::fetch_installed_vraix_date(out)
1270        .ok()
1271        .flatten()
1272        .map(|d| format!(", vraix_date: {d}"))
1273        .unwrap_or_default();
1274    println!("Installed pidbox → {} ({} records{})", out_path, n, date);
1275    Ok(())
1276}
1277
1278/// Fetch works for a ROR organization from Crossref + DataCite, upsert them
1279/// into the works SQLite database, and print a commonmeta JSON array
1280/// ([org, work1, …]) to stdout. `number = 0` defaults to 10; `page` is 1-based.
1281fn import_ror_works(ror_url: &str, works_db: &str, number: usize, page: usize) -> Result<(), String> {
1282    let ror = crate::utils::normalize_ror(ror_url);
1283    if ror.is_empty() {
1284        return Err(format!("import: '{}' is not a valid ROR identifier", ror_url));
1285    }
1286    let limit = if number == 0 { 10 } else { number };
1287    let t = Instant::now();
1288
1289    let org = crate::fetch_ror(&ror).map_err(|e| e.to_string())?;
1290
1291    let mut works: Vec<crate::Data> = Vec::new();
1292    let mut cr = crate::fetch_crossref_by_ror(&ror, limit, page).unwrap_or_default();
1293    let mut dc = crate::fetch_datacite_by_ror(&ror, limit, page).unwrap_or_default();
1294    works.append(&mut cr);
1295    works.append(&mut dc);
1296
1297    if !works.is_empty() {
1298        let mut seen = std::collections::HashSet::new();
1299        works.retain(|d| seen.insert(d.id.clone()));
1300        works.sort_by(|a, b| b.date_published.cmp(&a.date_published));
1301        works.truncate(limit);
1302        crate::upsert_sqlite(&works, Path::new(works_db)).map_err(|e| e.to_string())?;
1303    }
1304
1305    eprintln!("ror: imported {} works for {} in {:.2?}", works.len(), ror, t.elapsed());
1306
1307    // Serialize and print the commonmeta JSON array.
1308    let org_val = serde_json::to_value(&org).map_err(|e| e.to_string())?;
1309    let mut items = vec![org_val];
1310    for work in &works {
1311        let prepared = crate::prepare_commonmeta(work);
1312        items.push(serde_json::to_value(&prepared).map_err(|e| e.to_string())?);
1313    }
1314    let output = serde_json::to_vec_pretty(&items).map_err(|e| e.to_string())?;
1315    println!("{}", String::from_utf8_lossy(&output));
1316    Ok(())
1317}
1318
1319/// Fetch a single ORCID person from the public API, upsert into `people_db`,
1320/// and fetch their works from Crossref + DataCite and upsert into `works_db`.
1321fn import_orcid_person(orcid_url: &str, people_db: &str, works_db: &str) -> Result<(), String> {
1322    let t = Instant::now();
1323    let n_works = crate::import_orcid_person(
1324        orcid_url,
1325        Path::new(people_db),
1326        Path::new(works_db),
1327    )
1328    .map_err(|e| e.to_string())?;
1329    eprintln!("orcid: imported {} in {:.2?}", orcid_url, t.elapsed());
1330    println!(
1331        "orcid: {} → {} ({} works → {})",
1332        orcid_url, people_db, n_works, works_db
1333    );
1334    Ok(())
1335}
1336
1337#[cfg(test)]
1338mod tests {
1339    use super::*;
1340
1341    fn parse_args(args: &[&str]) -> clap::ArgMatches {
1342        command().try_get_matches_from(args).expect("arg parse failed")
1343    }
1344
1345    #[test]
1346    fn test_no_network_with_doi_errors() {
1347        let m = parse_args(&["import", "--no-network", "10.7554/elife.01567"]);
1348        let err = execute(&m).unwrap_err();
1349        assert!(
1350            err.contains("--no-network"),
1351            "expected --no-network in error, got: {err}"
1352        );
1353    }
1354
1355    #[test]
1356    fn test_no_network_with_api_fetch_errors() {
1357        let m = parse_args(&["import", "--no-network", "--from", "crossref", "--ror", "00pd74e08"]);
1358        let err = execute(&m).unwrap_err();
1359        assert!(
1360            err.contains("--no-network"),
1361            "expected --no-network in error, got: {err}"
1362        );
1363    }
1364
1365    #[test]
1366    fn test_no_network_with_local_sqlite_passes_guard() {
1367        // Use a generic .sqlite3 name (not the crossref-/datacite- VRAIX pattern)
1368        // so the guard passes and the command fails at the "from commonmeta requires
1369        // a .sqlite3 path" check rather than entering the slow streaming path.
1370        let m = parse_args(&["import", "--no-network", "local.sqlite3"]);
1371        let err = execute(&m).unwrap_err();
1372        assert!(
1373            !err.contains("--no-network"),
1374            "should not fail at network guard for local sqlite, got: {err}"
1375        );
1376    }
1377}