use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use camino::Utf8PathBuf;
#[cfg(feature = "citation")]
use doiget_core::http::tier_2_allowlist;
use doiget_core::http::{oa_publisher_allowlist, tier_1_allowlist, HttpClient};
use doiget_core::orchestrator::{fetch_paper as core_fetch_paper, FetchPaperOutcome, PdfLegStatus};
use doiget_core::provenance::{Capability, LogEvent, LogResult, ProvenanceLog, RowInput};
use doiget_core::rate_limiter::RateLimiter;
use doiget_core::source::{FetchContext, FetchError};
use doiget_core::store::FsStore;
use doiget_core::{CapabilityProfile, DenialContext, ErrorCode, RateLimits, Ref};
fn new_session_id() -> String {
ulid::Ulid::new().to_string()
}
pub use doiget_core::dry_run::{
build_dry_run_envelope, build_fetch_plan, FetchPlan, PdfSourcePlan, RateLimitBudget,
};
#[allow(clippy::print_stdout)]
pub fn emit_dry_run_plan_to_stdout(ref_: &Ref, plan: &FetchPlan) -> Result<()> {
let envelope = build_dry_run_envelope(ref_, plan);
let s = serde_json::to_string(&envelope).context("serializing dry-run envelope to JSON")?;
println!("{s}");
Ok(())
}
fn resolve_log_path() -> Result<Utf8PathBuf> {
if let Some(s) = read_env_utf8("DOIGET_LOG_PATH")? {
return Ok(Utf8PathBuf::from(s));
}
let cfg = config_dir_utf8()?;
Ok(cfg.join("doiget").join("access.jsonl"))
}
fn read_env_utf8(key: &str) -> Result<Option<String>> {
match std::env::var(key) {
Ok(s) => Ok(Some(s)),
Err(std::env::VarError::NotPresent) => Ok(None),
Err(std::env::VarError::NotUnicode(_)) => Err(anyhow!("{key} is not valid UTF-8")),
}
}
fn home_dir_utf8() -> Result<Utf8PathBuf> {
if let Some(s) = read_env_utf8("HOME")? {
return Ok(Utf8PathBuf::from(s));
}
if let Some(s) = read_env_utf8("USERPROFILE")? {
return Ok(Utf8PathBuf::from(s));
}
Err(anyhow!("neither HOME nor USERPROFILE is set"))
}
fn config_dir_utf8() -> Result<Utf8PathBuf> {
if let Some(s) = read_env_utf8("XDG_CONFIG_HOME")? {
return Ok(Utf8PathBuf::from(s));
}
if let Some(s) = read_env_utf8("APPDATA")? {
return Ok(Utf8PathBuf::from(s));
}
let home = home_dir_utf8()?;
Ok(home.join(".config"))
}
fn build_http_client() -> Result<HttpClient> {
let arxiv = std::env::var("DOIGET_ARXIV_BASE").ok();
let crossref = std::env::var("DOIGET_CROSSREF_BASE").ok();
let unpaywall = std::env::var("DOIGET_UNPAYWALL_BASE").ok();
let oa_publisher = std::env::var("DOIGET_OA_PUBLISHER_BASE").ok();
let openalex_base = std::env::var("DOIGET_OPENALEX_BASE").ok();
if arxiv.is_none()
&& crossref.is_none()
&& unpaywall.is_none()
&& oa_publisher.is_none()
&& openalex_base.is_none()
{
let mut allowlists = tier_1_allowlist();
allowlists.extend(oa_publisher_allowlist());
#[cfg(feature = "citation")]
allowlists.extend(tier_2_allowlist());
return HttpClient::new(allowlists).context("building HTTP client");
}
let mut owned: Vec<(String, String)> = Vec::new();
for (source, base) in [
("arxiv", arxiv.as_deref()),
("crossref", crossref.as_deref()),
("unpaywall", unpaywall.as_deref()),
("oa-publisher", oa_publisher.as_deref()),
("openalex", openalex_base.as_deref()),
] {
if let Some(b) = base {
let url = url::Url::parse(b)
.with_context(|| format!("DOIGET_*_BASE for {source} is not a URL: {b}"))?;
let host = url
.host_str()
.ok_or_else(|| anyhow!("base URL has no host: {b}"))?;
owned.push((source.to_string(), host.to_string()));
}
}
let entries: Vec<(&str, &str)> = owned
.iter()
.map(|(s, h)| (s.as_str(), h.as_str()))
.collect();
Ok(HttpClient::new_for_tests_allow_http_multi(&entries))
}
#[allow(dead_code)]
pub(crate) struct OrchestratorConfig {
pub(crate) store_root: Utf8PathBuf,
pub(crate) log_path: Utf8PathBuf,
pub(crate) contact_email: String,
pub(crate) unpaywall_email: String,
}
impl OrchestratorConfig {
fn from_env() -> Result<Self> {
let store_root = super::resolve_store_root()?;
let log_path = resolve_log_path()?;
let contact_email =
std::env::var("DOIGET_CONTACT_EMAIL").unwrap_or_else(|_| "doiget@localhost".into());
let unpaywall_email =
std::env::var("DOIGET_UNPAYWALL_EMAIL").unwrap_or_else(|_| contact_email.clone());
Ok(Self {
store_root,
log_path,
contact_email,
unpaywall_email,
})
}
}
pub(crate) struct FetchHarness {
pub(crate) http: Arc<HttpClient>,
pub(crate) rate_limiter: Arc<RateLimiter>,
pub(crate) log: Arc<ProvenanceLog>,
pub(crate) store: FsStore,
pub(crate) profile: CapabilityProfile,
pub(crate) session_id: String,
#[allow(dead_code)]
pub(crate) cfg: OrchestratorConfig,
}
impl FetchHarness {
pub(crate) fn from_env() -> Result<Self> {
let cfg = OrchestratorConfig::from_env()?;
if let Some(parent) = cfg.log_path.parent() {
if !parent.as_str().is_empty() {
std::fs::create_dir_all(parent.as_std_path())
.with_context(|| format!("creating log dir {parent}"))?;
}
}
let session_id = new_session_id();
let log = Arc::new(
ProvenanceLog::open(cfg.log_path.clone(), session_id.clone())
.context("opening provenance log")?,
);
let http = Arc::new(build_http_client()?);
let rate_limiter = Arc::new(RateLimiter::new(RateLimits::HARD_CODED));
let store = FsStore::new(cfg.store_root.clone()).context("opening store")?;
let profile = CapabilityProfile::from_env().context("resolving capability profile")?;
Ok(Self {
http,
rate_limiter,
log,
store,
profile,
session_id,
cfg,
})
}
pub(crate) fn fetch_context(&self) -> FetchContext {
FetchContext {
http: self.http.clone(),
rate_limiter: self.rate_limiter.clone(),
log: self.log.clone(),
session_id: self.session_id.clone(),
}
}
pub(crate) fn log_session_start(&self, ref_input: Option<&str>) -> Result<()> {
self.log
.append(RowInput {
event: LogEvent::SessionStart,
result: LogResult::Ok,
capability: Capability::Oa,
ref_: ref_input,
source: None,
error_code: None,
size_bytes: None,
license: None,
store_path: None,
canonical_digest: None,
})
.context("appending SessionStart row")?;
Ok(())
}
pub(crate) fn log_session_end(&self, ok: bool, ref_input: Option<&str>) {
let result = if ok { LogResult::Ok } else { LogResult::Err };
let _ = self.log.append(RowInput {
event: LogEvent::SessionEnd,
result,
capability: Capability::Oa,
ref_: ref_input,
source: None,
error_code: None,
size_bytes: None,
license: None,
store_path: None,
canonical_digest: None,
});
}
pub(crate) async fn fetch_one(&self, ref_: &Ref) -> Result<()> {
let ctx = self.fetch_context();
match core_fetch_paper(ref_, &self.profile, &ctx, &self.store, self.store.root()).await {
Ok(outcome) => {
emit_success_line(ref_, &outcome);
Ok(())
}
Err(e) => {
render_fetch_error(&e);
let code: ErrorCode = (&e).into();
Err(anyhow::Error::new(CliExit(cli_exit_code(code))))
}
}
}
}
fn emit_success_line(ref_: &Ref, outcome: &FetchPaperOutcome) {
let label = match ref_ {
Ref::Arxiv(id) => format!("arxiv:{}", id.as_str()),
Ref::Doi(doi) => format!("doi:{}", doi.as_str()),
};
match &outcome.pdf_leg {
PdfLegStatus::Fetched => {
print_success(format_args!(
"fetched {} ({} bytes) -> {}",
label, outcome.size_bytes, outcome.path
));
}
PdfLegStatus::NoOaUrl => {
print_success(format_args!(
"fetched {} (metadata-only: no OA PDF available) -> {}",
label, outcome.path
));
}
PdfLegStatus::Blocked { code, message, .. } => {
print_success(format_args!(
"fetched {} (metadata-only) -> {}",
label, outcome.path
));
print_success(format_args!(
" note: an OA PDF was found but could not be retrieved [{}]: {}",
code.as_wire(),
message
));
}
_ => {
if outcome.size_bytes == 0 {
print_success(format_args!(
"fetched {} (metadata-only) -> {}",
label, outcome.path
));
} else {
print_success(format_args!(
"fetched {} ({} bytes) -> {}",
label, outcome.size_bytes, outcome.path
));
}
}
}
}
pub async fn run_with_options(input: String, dry_run: bool) -> Result<()> {
let ref_ = match Ref::parse(&input) {
Ok(r) => r,
Err(e) => {
print_err(format_args!(
"error[{}]: invalid ref: {e}",
ErrorCode::InvalidRef.as_wire()
));
return Err(anyhow::Error::new(CliExit(cli_exit_code(
ErrorCode::InvalidRef,
))));
}
};
if dry_run {
let store_root = super::resolve_store_root()?;
let plan = build_fetch_plan(&ref_, &store_root);
emit_dry_run_plan_to_stdout(&ref_, &plan)?;
return Ok(());
}
let harness = FetchHarness::from_env()?;
harness.log_session_start(Some(ref_.as_input_str()))?;
let result = harness.fetch_one(&ref_).await;
harness.log_session_end(result.is_ok(), Some(ref_.as_input_str()));
result
}
#[allow(clippy::print_stderr)]
fn print_success(args: std::fmt::Arguments<'_>) {
eprintln!("{args}");
}
#[allow(clippy::print_stderr)]
fn print_err(args: std::fmt::Arguments<'_>) {
eprintln!("{args}");
}
#[derive(Debug)]
pub struct CliExit(pub i32);
impl std::fmt::Display for CliExit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "exiting with status {}", self.0)
}
}
impl std::error::Error for CliExit {}
fn cli_exit_code(code: ErrorCode) -> i32 {
match code {
ErrorCode::CapabilityDenied => 3,
ErrorCode::StoreError | ErrorCode::LogError => 4,
ErrorCode::FetchTimeout => 124,
_ => 1,
}
}
fn render_fetch_error(e: &FetchError) {
let code: ErrorCode = e.into();
print_err(format_args!("error[{}]: {}", code.as_wire(), e));
if let Some(dc) = Option::<DenialContext>::from(e) {
let attempted = dc.attempted.as_deref().unwrap_or("(unknown)");
match &dc.expected {
Some(exp) if !exp.is_empty() => {
print_err(format_args!(
" = note: attempted {attempted}; allowed: {}",
exp.join(", ")
));
}
_ => {
print_err(format_args!(" = note: attempted {attempted}"));
}
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn new_session_id_is_26_chars() {
let id = new_session_id();
assert_eq!(id.len(), 26, "session id must be 26 chars: {:?}", id);
assert!(
id.chars().all(|c| c.is_ascii_alphanumeric()),
"ulid must be ASCII alphanumeric: {:?}",
id
);
}
#[test]
fn fetch_paper_outcome_is_reachable_from_cli() {
let _ = std::any::type_name::<doiget_core::orchestrator::FetchPaperOutcome>();
}
}