use async_trait::async_trait;
use bytes::Bytes;
use quick_xml::events::Event;
use quick_xml::Reader;
use serde_json::{json, Value};
use url::Url;
use crate::provenance::{Capability, LogEvent, LogResult, RowInput};
use crate::source::{FetchContext, FetchError, FetchResult, Source};
use crate::{ArxivId, CapabilityProfile, Ref};
const PDF_BASE: &str = "https://arxiv.org";
#[derive(Clone, Debug)]
pub struct ArxivSource {
base: Url,
}
impl ArxivSource {
pub fn new() -> Self {
#[allow(clippy::expect_used)]
let base = Url::parse(PDF_BASE).expect("hard-coded base URL is valid");
Self { base }
}
pub fn with_base(base: Url) -> Self {
Self { base }
}
fn pdf_url(&self, id: &ArxivId) -> Result<Url, FetchError> {
let path = format!("/pdf/{}.pdf", id.as_str());
self.base.join(&path).map_err(|e| FetchError::SourceSchema {
hint: format!("arxiv URL construction failed: {e}"),
})
}
fn metadata_url(&self, id: &ArxivId) -> Result<Url, FetchError> {
let mut url = self
.base
.join("/api/query")
.map_err(|e| FetchError::SourceSchema {
hint: format!("arxiv metadata URL construction failed: {e}"),
})?;
url.query_pairs_mut().append_pair("id_list", id.as_str());
Ok(url)
}
pub async fn fetch_metadata_only(
&self,
id: &ArxivId,
ctx: &FetchContext,
) -> Result<Value, FetchError> {
let _permit = ctx.rate_limiter.acquire(self.name()).await;
let url = self.metadata_url(id)?;
let (body, _final_url) = ctx.http.fetch_bytes(self.name(), url).await?;
let metadata = parse_atom_feed(&body)?;
let canonical =
crate::CanonicalRef::new(crate::SourceType::Arxiv, id.as_str(), self.name(), None)
.digest_hex();
ctx.log.append(RowInput {
event: LogEvent::Fetch,
result: LogResult::Ok,
capability: Capability::Metadata,
ref_: Some(id.as_str()),
source: Some(self.name()),
error_code: None,
size_bytes: Some(body.len() as u64),
license: Some("arxiv-default"),
store_path: None,
canonical_digest: Some(&canonical),
})?;
Ok(metadata)
}
}
impl Default for ArxivSource {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Source for ArxivSource {
fn name(&self) -> &str {
"arxiv"
}
fn can_serve(&self, _profile: &CapabilityProfile, ref_: &Ref) -> bool {
matches!(ref_, Ref::Arxiv(_))
}
async fn fetch(
&self,
ref_: &Ref,
_profile: &CapabilityProfile,
ctx: &FetchContext,
) -> Result<FetchResult, FetchError> {
let id = match ref_ {
Ref::Arxiv(a) => a,
Ref::Doi(_) => {
return Err(FetchError::NotEligible {
source_key: "arxiv".into(),
});
}
};
let _permit = ctx.rate_limiter.acquire(self.name()).await;
let metadata_json = match self.metadata_url(id) {
Ok(meta_url) => match ctx.http.fetch_bytes(self.name(), meta_url).await {
Ok((bytes, _final)) => match parse_atom_feed(&bytes) {
Ok(v) => Some(v),
Err(e) => {
tracing::warn!(
arxiv_id = %id.as_str(),
error = %e,
"arxiv Atom feed parse failed; continuing with PDF-only fetch"
);
None
}
},
Err(e) => {
tracing::warn!(
arxiv_id = %id.as_str(),
error = %e,
"arxiv Atom feed fetch failed; continuing with PDF-only fetch"
);
None
}
},
Err(e) => {
tracing::warn!(
arxiv_id = %id.as_str(),
error = %e,
"arxiv metadata URL construction failed; continuing with PDF-only fetch"
);
None
}
};
let url = self.pdf_url(id)?;
let (body, final_url): (Bytes, Url) = ctx.http.fetch_pdf(self.name(), url).await?;
let canonical = ref_.promote(self.name(), None).digest_hex();
ctx.log.append(RowInput {
event: LogEvent::Fetch,
result: LogResult::Ok,
capability: Capability::Oa,
ref_: Some(id.as_str()),
source: Some(self.name()),
error_code: None,
size_bytes: Some(body.len() as u64),
license: Some("arxiv-default"),
store_path: None,
canonical_digest: Some(&canonical),
})?;
Ok(FetchResult {
source: self.name().to_string(),
license: "arxiv-default".into(),
pdf_bytes: Some(body),
final_url: Some(final_url),
metadata_json,
})
}
}
pub(crate) fn parse_atom_feed(xml: &[u8]) -> Result<Value, FetchError> {
let mut reader = Reader::from_reader(xml);
let config = reader.config_mut();
config.trim_text(true);
let mut in_entry = false;
let mut saw_entry = false;
let mut depth = 0_i32;
let mut title: Option<String> = None;
let mut abstract_: Option<String> = None;
let mut published: Option<String> = None;
let mut updated: Option<String> = None;
let mut authors: Vec<String> = Vec::new();
let mut categories: Vec<String> = Vec::new();
#[derive(Clone, Copy)]
enum Target {
Title,
Summary,
Published,
Updated,
AuthorName,
}
let mut target: Option<Target> = None;
let mut in_author = false;
let mut buf: Vec<u8> = Vec::new();
loop {
match reader.read_event_into(&mut buf) {
Ok(Event::Start(e)) => {
let name_bytes = e.name();
let local = local_name(name_bytes.as_ref());
if !in_entry {
if local == b"entry" {
in_entry = true;
saw_entry = true;
depth = 0;
}
buf.clear();
continue;
}
depth += 1;
if depth == 1 {
match local {
b"title" => target = Some(Target::Title),
b"summary" => target = Some(Target::Summary),
b"published" => target = Some(Target::Published),
b"updated" => target = Some(Target::Updated),
b"author" => {
in_author = true;
authors.push(String::new());
}
_ => {}
}
} else if depth == 2 && in_author && local == b"name" {
target = Some(Target::AuthorName);
}
buf.clear();
}
Ok(Event::Empty(e)) => {
let name_bytes = e.name();
let local = local_name(name_bytes.as_ref());
if in_entry && depth == 0 && local == b"category" {
for attr in e.attributes().flatten() {
if attr.key.as_ref() == b"term" {
if let Ok(v) = attr.unescape_value() {
categories.push(v.into_owned());
}
}
}
}
buf.clear();
}
Ok(Event::Text(t)) => {
if let Some(tg) = target {
if let Ok(s) = t.unescape() {
let s = s.into_owned();
match tg {
Target::Title => title.get_or_insert_with(String::new).push_str(&s),
Target::Summary => {
abstract_.get_or_insert_with(String::new).push_str(&s)
}
Target::Published => {
published.get_or_insert_with(String::new).push_str(&s)
}
Target::Updated => updated.get_or_insert_with(String::new).push_str(&s),
Target::AuthorName => {
if let Some(last) = authors.last_mut() {
last.push_str(&s);
}
}
}
}
}
buf.clear();
}
Ok(Event::End(e)) => {
if !in_entry {
buf.clear();
continue;
}
let name_bytes = e.name();
let local = local_name(name_bytes.as_ref());
if depth == 0 && local == b"entry" {
break;
}
depth -= 1;
if depth == 0 {
if local == b"author" {
in_author = false;
if let Some(last) = authors.last() {
if last.is_empty() {
authors.pop();
}
}
}
target = None;
} else if depth == 1 && in_author && local == b"name" {
target = None;
}
buf.clear();
}
Ok(Event::Eof) => break,
Err(e) => {
return Err(FetchError::SourceSchema {
hint: format!("arxiv Atom XML parse error: {e}"),
});
}
_ => {
buf.clear();
}
}
}
if !saw_entry {
return Err(FetchError::SourceSchema {
hint: "arxiv Atom feed had no <entry> element (unknown id?)".into(),
});
}
let mut obj = serde_json::Map::new();
if let Some(t) = title {
let trimmed = t.trim().to_string();
if !trimmed.is_empty() {
obj.insert("title".into(), Value::String(trimmed));
}
}
if let Some(a) = abstract_ {
let trimmed = a.trim().to_string();
if !trimmed.is_empty() {
obj.insert("abstract".into(), Value::String(trimmed));
}
}
if !authors.is_empty() {
obj.insert(
"authors".into(),
Value::Array(authors.into_iter().map(Value::String).collect()),
);
}
if let Some(p) = published {
let trimmed = p.trim().to_string();
if !trimmed.is_empty() {
obj.insert("published".into(), Value::String(trimmed));
}
}
if let Some(u) = updated {
let trimmed = u.trim().to_string();
if !trimmed.is_empty() {
obj.insert("updated".into(), Value::String(trimmed));
}
}
if !categories.is_empty() {
obj.insert(
"categories".into(),
Value::Array(categories.into_iter().map(Value::String).collect()),
);
}
Ok(json!(obj))
}
fn local_name(qname: &[u8]) -> &[u8] {
match qname.iter().rposition(|&b| b == b':') {
Some(idx) => &qname[idx + 1..],
None => qname,
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use std::sync::Arc;
use camino::Utf8PathBuf;
use tempfile::TempDir;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use crate::http::{HttpClient, HttpError};
use crate::provenance::{LogRow, ProvenanceLog};
use crate::rate_limiter::RateLimiter;
use crate::source::FetchContext;
use crate::{ArxivId, CapabilityProfile, Doi, RateLimits, Ref};
const TEST_SESSION_ID: &str = "01J0000000000000000000TEST";
fn build_test_context(wiremock_host: &str) -> (TempDir, FetchContext) {
let td = TempDir::new().expect("tempdir");
let log_dir =
Utf8PathBuf::try_from(td.path().to_path_buf()).expect("temp dir path must be UTF-8");
let log_path = log_dir.join("test.jsonl");
let http = Arc::new(HttpClient::new_for_tests_allow_http("arxiv", wiremock_host));
let rate_limiter = Arc::new(RateLimiter::new(RateLimits::HARD_CODED));
let session_id = TEST_SESSION_ID.to_string();
let log = Arc::new(
ProvenanceLog::open(log_path, session_id.clone()).expect("provenance log opens"),
);
(
td,
FetchContext {
http,
rate_limiter,
log,
session_id,
},
)
}
fn read_rows(path: &camino::Utf8Path) -> Vec<LogRow> {
let raw = std::fs::read_to_string(path).expect("read log");
raw.lines()
.filter(|l| !l.is_empty())
.map(|l| serde_json::from_str::<LogRow>(l).expect("valid LogRow"))
.collect()
}
fn profile() -> CapabilityProfile {
CapabilityProfile::from_env().expect("Phase 0 stub profile")
}
#[test]
fn arxiv_can_serve_returns_true_for_arxiv() {
let s = ArxivSource::new();
let id = ArxivId::parse("2401.12345").expect("valid id");
let r = Ref::Arxiv(id);
assert!(s.can_serve(&profile(), &r));
}
#[test]
fn arxiv_can_serve_returns_false_for_doi() {
let s = ArxivSource::new();
let r = Ref::Doi(Doi("10.1234/example".to_string()));
assert!(!s.can_serve(&profile(), &r));
}
#[tokio::test]
async fn arxiv_fetch_new_style_id_returns_pdf_bytes() {
let server = MockServer::start().await;
let body = b"%PDF-1.7\n%fixture\n".to_vec();
Mock::given(method("GET"))
.and(path("/pdf/2401.12345.pdf"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(body.clone()))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let r = Ref::Arxiv(id);
let res = s.fetch(&r, &profile(), &ctx).await.expect("fetch ok");
assert_eq!(res.source, "arxiv");
assert_eq!(res.license, "arxiv-default");
let bytes = res.pdf_bytes.expect("pdf bytes set");
assert!(
bytes.starts_with(b"%PDF-"),
"expected PDF magic prefix, got {:?}",
&bytes[..bytes.len().min(8)]
);
assert_eq!(&bytes[..], &body[..]);
}
#[tokio::test]
async fn arxiv_fetch_old_style_id_returns_pdf_bytes() {
let server = MockServer::start().await;
let body = b"%PDF-1.4\n%old-style fixture\n".to_vec();
Mock::given(method("GET"))
.and(path("/pdf/cond-mat/9501001.pdf"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(body.clone()))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("cond-mat/9501001").expect("old-style id");
let r = Ref::Arxiv(id);
let res = s.fetch(&r, &profile(), &ctx).await.expect("fetch ok");
let bytes = res.pdf_bytes.expect("pdf bytes set");
assert!(bytes.starts_with(b"%PDF-"));
assert_eq!(&bytes[..], &body[..]);
}
#[tokio::test]
async fn arxiv_fetch_with_doi_ref_errors_not_eligible() {
let server = MockServer::start().await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let r = Ref::Doi(Doi("10.1234/example".to_string()));
let err = s
.fetch(&r, &profile(), &ctx)
.await
.expect_err("doi ref must not be eligible");
match err {
FetchError::NotEligible { source_key } => {
assert_eq!(source_key, "arxiv");
}
other => panic!("expected NotEligible, got {:?}", other),
}
}
#[tokio::test]
async fn arxiv_fetch_writes_log_row_with_arxiv_default_license() {
let server = MockServer::start().await;
let body = b"%PDF-1.7\n%log-row fixture\n".to_vec();
Mock::given(method("GET"))
.and(path("/pdf/2401.12345.pdf"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(body.clone()))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let log_path = ctx.log.path().to_path_buf();
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let r = Ref::Arxiv(id);
let _ = s.fetch(&r, &profile(), &ctx).await.expect("fetch ok");
let rows = read_rows(&log_path);
assert_eq!(rows.len(), 1, "exactly one fetch row expected");
let row = &rows[0];
assert_eq!(row.source.as_deref(), Some("arxiv"));
assert_eq!(row.ref_.as_deref(), Some("2401.12345"));
assert_eq!(row.license.as_deref(), Some("arxiv-default"));
assert_eq!(row.size_bytes, Some(body.len() as u64));
assert!(row.error_code.is_none());
}
#[tokio::test]
async fn arxiv_non_pdf_body_rejected() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/pdf/2401.12345.pdf"))
.respond_with(
ResponseTemplate::new(200).set_body_bytes(b"<html>not a pdf</html>".to_vec()),
)
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let r = Ref::Arxiv(id);
let err = s
.fetch(&r, &profile(), &ctx)
.await
.expect_err("non-pdf body must be rejected");
match err {
FetchError::Http(HttpError::NotAPdf { got }) => {
assert_eq!(&got, b"<html");
}
other => panic!("expected FetchError::Http(NotAPdf), got {:?}", other),
}
}
#[tokio::test]
async fn arxiv_404_maps_to_http_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/pdf/2401.99999.pdf"))
.respond_with(ResponseTemplate::new(404))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.99999").unwrap();
let r = Ref::Arxiv(id);
let err = s
.fetch(&r, &profile(), &ctx)
.await
.expect_err("404 must surface");
match err {
FetchError::Http(HttpError::HttpStatus { status, .. }) => {
assert_eq!(status, 404);
}
other => panic!("expected FetchError::Http(HttpStatus), got {:?}", other),
}
}
const SAMPLE_ATOM_FEED: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<id>http://arxiv.org/abs/2401.12345v1</id>
<updated>2024-02-01T00:00:00Z</updated>
<published>2024-01-15T00:00:00Z</published>
<title>Example arXiv Paper Title</title>
<summary>This is an example abstract.</summary>
<author>
<name>Jane Doe</name>
</author>
<author>
<name>John Roe</name>
</author>
<category term="cs.LG" scheme="http://arxiv.org/schemas/atom"/>
<category term="stat.ML" scheme="http://arxiv.org/schemas/atom"/>
</entry>
</feed>"#;
#[test]
fn parse_atom_feed_extracts_all_fields() {
let v = parse_atom_feed(SAMPLE_ATOM_FEED.as_bytes()).expect("Atom parses");
assert_eq!(v["title"], serde_json::json!("Example arXiv Paper Title"));
assert_eq!(
v["abstract"],
serde_json::json!("This is an example abstract.")
);
assert_eq!(v["authors"], serde_json::json!(["Jane Doe", "John Roe"]));
assert_eq!(v["published"], serde_json::json!("2024-01-15T00:00:00Z"));
assert_eq!(v["updated"], serde_json::json!("2024-02-01T00:00:00Z"));
assert_eq!(v["categories"], serde_json::json!(["cs.LG", "stat.ML"]));
}
#[test]
fn parse_atom_feed_empty_feed_errors_source_schema() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom"></feed>"#;
let err = parse_atom_feed(xml.as_bytes()).expect_err("empty feed must error");
match err {
FetchError::SourceSchema { hint } => {
assert!(
hint.contains("entry"),
"expected mention of <entry>; got {hint}"
);
}
other => panic!("expected SourceSchema, got {other:?}"),
}
}
#[test]
fn parse_atom_feed_omits_missing_optional_fields() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<entry>
<id>http://arxiv.org/abs/2401.00001v1</id>
<title>Minimal Entry</title>
</entry>
</feed>"#;
let v = parse_atom_feed(xml.as_bytes()).expect("parses");
let obj = v.as_object().expect("object");
assert_eq!(
obj.get("title").and_then(Value::as_str),
Some("Minimal Entry")
);
assert!(
!obj.contains_key("abstract"),
"abstract should be omitted: {obj:?}"
);
assert!(
!obj.contains_key("authors"),
"authors should be omitted: {obj:?}"
);
assert!(
!obj.contains_key("categories"),
"categories should be omitted: {obj:?}"
);
}
#[tokio::test]
async fn arxiv_fetch_metadata_only_returns_atom_metadata() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/query"))
.respond_with(ResponseTemplate::new(200).set_body_string(SAMPLE_ATOM_FEED))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let meta = s
.fetch_metadata_only(&id, &ctx)
.await
.expect("metadata_only ok");
assert_eq!(
meta["title"],
serde_json::json!("Example arXiv Paper Title")
);
assert_eq!(meta["authors"], serde_json::json!(["Jane Doe", "John Roe"]));
}
#[tokio::test]
async fn arxiv_fetch_populates_metadata_json_when_atom_endpoint_mocked() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/query"))
.respond_with(ResponseTemplate::new(200).set_body_string(SAMPLE_ATOM_FEED))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/pdf/2401.12345.pdf"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(b"%PDF-1.7\n%fix\n".to_vec()))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let r = Ref::Arxiv(id);
let res = s.fetch(&r, &profile(), &ctx).await.expect("fetch ok");
let meta = res.metadata_json.expect("metadata_json populated");
assert_eq!(
meta["title"],
serde_json::json!("Example arXiv Paper Title")
);
}
#[tokio::test]
async fn arxiv_fetch_atom_failure_falls_back_to_pdf_only() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/pdf/2401.12345.pdf"))
.respond_with(ResponseTemplate::new(200).set_body_bytes(b"%PDF-1.7\nx".to_vec()))
.mount(&server)
.await;
let host = server
.uri()
.parse::<Url>()
.unwrap()
.host_str()
.unwrap()
.to_string();
let (_td, ctx) = build_test_context(&host);
let s = ArxivSource::with_base(server.uri().parse().unwrap());
let id = ArxivId::parse("2401.12345").unwrap();
let r = Ref::Arxiv(id);
let res = s.fetch(&r, &profile(), &ctx).await.expect("fetch ok");
assert!(res.metadata_json.is_none());
assert!(res.pdf_bytes.is_some());
}
}