use std::{sync::Arc, time::Duration};
use clap::{crate_name, crate_version};
use feed_rs::{model::Feed, parser};
use jiff::Timestamp;
use reqwest::{
StatusCode, {Client, ClientBuilder},
};
use tracing::{debug, warn};
use url::Url;
use crate::{
cache::{Cache, CacheValue},
error::OpenringError,
};
pub(crate) trait FeedFetcher {
async fn fetch_feed(&self, cache: &Arc<Cache>) -> Result<Feed, OpenringError>;
}
#[must_use]
pub fn normalize_etag(s: &str) -> String {
if (s.starts_with('"') && s.ends_with('"')) || (s.starts_with("W/\"") && s.ends_with('"')) {
s.to_string()
} else {
format!("\"{s}\"")
}
}
pub(crate) mod logic {
use jiff::{Span, Timestamp, ToSpan};
use reqwest::StatusCode;
use crate::cache::{CacheValue, MAX_SPAN_SEC};
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub(crate) struct ConditionalHeaders {
pub if_modified_since: Option<String>,
pub if_none_match: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) enum Disposition {
Store {
etag: Option<String>,
last_modified: Option<String>,
body: Option<String>,
},
Reuse,
RateLimited { retry_after: Span },
RateLimitedNoCache,
Unexpected { status: String },
}
pub(crate) fn retry_after_gate_open(cv: &CacheValue, now: Timestamp) -> bool {
cv.retry_after
.is_some_and(|retry| cv.timestamp + retry > now)
}
pub(crate) fn conditional_headers(cv: Option<&CacheValue>) -> ConditionalHeaders {
cv.map_or_else(ConditionalHeaders::default, |cv| ConditionalHeaders {
if_modified_since: cv.last_modified.clone(),
if_none_match: cv.etag.clone(),
})
}
pub(crate) fn parse_retry_after(header: Option<&str>) -> Span {
header.and_then(|s| s.parse::<i64>().ok()).map_or_else(
|| 4.hours(),
|secs| secs.clamp(-MAX_SPAN_SEC, MAX_SPAN_SEC).seconds(),
)
}
pub(crate) fn disposition(
status: StatusCode,
etag: Option<&str>,
last_modified: Option<&str>,
body: Option<String>,
had_cache_entry: bool,
retry_after_header: Option<&str>,
) -> Disposition {
if status == StatusCode::NOT_MODIFIED && had_cache_entry {
Disposition::Reuse
} else if status.is_success() || status == StatusCode::NOT_MODIFIED {
Disposition::Store {
etag: etag.map(ToString::to_string),
last_modified: last_modified.map(ToString::to_string),
body,
}
} else if status == StatusCode::TOO_MANY_REQUESTS {
if had_cache_entry {
Disposition::RateLimited {
retry_after: parse_retry_after(retry_after_header),
}
} else {
Disposition::RateLimitedNoCache
}
} else {
Disposition::Unexpected {
status: status.as_str().to_string(),
}
}
}
}
fn parse_feed(url: &Url, feed_str: &str) -> Result<Feed, OpenringError> {
parser::parse(feed_str.as_bytes()).map_err(|e| {
warn!(url=%url.as_str(), error=%e, "failed to parse feed.");
OpenringError::from(e)
})
}
fn apply_disposition(
url: &Url,
cache: &Cache,
now: Timestamp,
disposition: logic::Disposition,
) -> Result<String, OpenringError> {
match disposition {
logic::Disposition::Store {
etag,
last_modified,
body,
} => {
if let Some(mut cv) = cache.get_mut(url) {
cv.etag = etag;
cv.last_modified = last_modified;
cv.body.clone_from(&body);
cv.timestamp = now;
cv.retry_after = None;
} else {
cache.insert(
url.clone(),
CacheValue {
timestamp: now,
retry_after: None,
etag,
last_modified,
body: body.clone(),
},
);
}
body.ok_or_else(|| OpenringError::EmptyFeedError(url.as_str().to_string()))
}
logic::Disposition::Reuse => cache
.get_mut(url)
.and_then(|mut cv| {
cv.timestamp = now;
cv.body.clone()
})
.ok_or_else(|| OpenringError::EmptyFeedError(url.as_str().to_string())),
logic::Disposition::RateLimited { retry_after } => cache
.get_mut(url)
.and_then(|mut cv| {
cv.timestamp = now;
cv.retry_after = Some(retry_after);
cv.body.clone()
})
.ok_or_else(|| OpenringError::EmptyFeedError(url.as_str().to_string())),
logic::Disposition::RateLimitedNoCache => {
Err(OpenringError::RateLimitError(url.as_str().to_string()))
}
logic::Disposition::Unexpected { status } => Err(OpenringError::UnexpectedStatusError {
url: url.as_str().to_string(),
status,
}),
}
}
impl FeedFetcher for Url {
async fn fetch_feed(&self, cache: &Arc<Cache>) -> Result<Feed, OpenringError> {
let now = Timestamp::now();
let client: Client = ClientBuilder::new()
.timeout(Duration::from_secs(30))
.user_agent(concat!(crate_name!(), '/', crate_version!()))
.build()?;
let cached: Option<CacheValue> = cache.get(self).map(|e| e.value().clone());
if let Some(cv) = &cached
&& logic::retry_after_gate_open(cv, now)
{
debug!(timestamp=%cv.timestamp, retry_after=?cv.retry_after, "skipping request due to 429, using feed from cache");
if let Some(feed_str) = &cv.body {
return parse_feed(self, feed_str);
}
warn!(url=%self.as_str(), "empty feed");
}
let mut req = client.get(self.as_str());
let headers = logic::conditional_headers(cached.as_ref());
if let Some(last_modified) = &headers.if_modified_since {
req = req.header("If-Modified-Since", last_modified);
}
if let Some(etag) = &headers.if_none_match {
req = req.header("If-None-Match", etag);
}
debug!(url=%self, request=?req, "sending request");
let resp = match req.send().await {
Ok(resp) => resp,
Err(e) => {
warn!(url=%self.as_str(), error=%e, "failed to get feed.");
return Err(e.into());
}
};
debug!(url=%self, response=?resp, "received response");
let status = resp.status();
let etag = resp
.headers()
.get("etag")
.and_then(|v| v.to_str().ok())
.map(normalize_etag);
let last_modified = resp
.headers()
.get("last-modified")
.and_then(|v| v.to_str().ok())
.map(str::to_string);
let retry_after = resp
.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.map(str::to_string);
let body = if status.is_success() || status == StatusCode::NOT_MODIFIED {
resp.text().await.ok()
} else {
None
};
let disposition = logic::disposition(
status,
etag.as_deref(),
last_modified.as_deref(),
body,
cached.is_some(),
retry_after.as_deref(),
);
let feed_str = apply_disposition(self, cache, now, disposition)?;
parse_feed(self, &feed_str)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use jiff::{Span, Timestamp, ToSpan};
use reqwest::StatusCode;
use url::Url;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
use hegel::extras::jiff as jiff_gs;
use hegel::generators;
use crate::cache::{Cache, CacheValue, MAX_SPAN_SEC};
use crate::error::OpenringError;
use super::{FeedFetcher, logic, normalize_etag};
const GATE_SECONDS_MAX: i64 = 50_000_000_000;
const GATE_NOW_MAX: i64 = 200_000_000_000;
fn get_valid_rss_feed(title: &str) -> String {
format!(
r#"
<?xml version="1.0"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
<channel>
<title>{title}</title>
<link>http://www.nasa.gov/</link>
<description>A RSS news feed containing the latest NASA press releases on the International Space Station.</description>
<language>en-us</language>
<pubDate>Tue, 10 Jun 2003 04:00:00 GMT</pubDate>
<item>
<title>Louisiana Students to Hear from NASA Astronauts Aboard Space Station</title>
<link>http://www.nasa.gov/press-release/louisiana-students-to-hear-from-nasa-astronauts-aboard-space-station</link>
<description>As part of the state's first Earth-to-space call, students from Louisiana will have an opportunity soon to hear from NASA astronauts aboard the International Space Station.</description>
<pubDate>Fri, 21 Jul 2023 09:04 EDT</pubDate>
<guid>http://www.nasa.gov/press-release/louisiana-students-to-hear-from-nasa-astronauts-aboard-space-station</guid>
</item>
</channel>
</rss>
"#
)
}
#[hegel::composite]
fn retry_spans(tc: hegel::TestCase) -> Span {
let secs = tc.draw(generators::integers::<i64>());
Span::new().seconds(secs.clamp(0, MAX_SPAN_SEC))
}
#[hegel::composite]
fn cache_values(tc: hegel::TestCase) -> CacheValue {
CacheValue {
timestamp: tc.draw(jiff_gs::timestamps()),
retry_after: tc.draw(generators::optional(retry_spans())),
last_modified: tc.draw(generators::optional(generators::text())),
etag: tc.draw(generators::optional(generators::text())),
body: tc.draw(generators::optional(generators::text())),
}
}
#[hegel::test]
fn parse_retry_after_never_panics(tc: hegel::TestCase) {
let header = tc.draw(generators::optional(generators::text()));
let _ = logic::parse_retry_after(header.as_deref());
}
#[hegel::test]
fn parse_retry_after_clamps_integer_seconds(tc: hegel::TestCase) {
let secs = tc.draw(generators::integers::<i64>());
let span = logic::parse_retry_after(Some(&secs.to_string()));
assert_eq!(span.get_seconds(), secs.clamp(-MAX_SPAN_SEC, MAX_SPAN_SEC));
}
#[hegel::test]
fn parse_retry_after_defaults_when_not_seconds(tc: hegel::TestCase) {
let header = tc.draw(generators::optional(generators::text()));
tc.assume(
header
.as_deref()
.and_then(|s| s.parse::<i64>().ok())
.is_none(),
);
let span = logic::parse_retry_after(header.as_deref());
assert_eq!(span.fieldwise(), 4.hours().fieldwise());
}
#[hegel::test]
fn gate_open_iff_deadline_after_now(tc: hegel::TestCase) {
let ts_secs = tc.draw(
generators::integers::<i64>()
.min_value(0)
.max_value(GATE_SECONDS_MAX),
);
let retry_secs = tc.draw(
generators::integers::<i64>()
.min_value(0)
.max_value(GATE_SECONDS_MAX),
);
let now_secs = tc.draw(
generators::integers::<i64>()
.min_value(0)
.max_value(GATE_NOW_MAX),
);
let cv = CacheValue {
timestamp: Timestamp::from_second(ts_secs).unwrap(),
retry_after: Some(Span::new().seconds(retry_secs)),
last_modified: None,
etag: None,
body: None,
};
let now = Timestamp::from_second(now_secs).unwrap();
let deadline_secs = ts_secs + retry_secs;
assert_eq!(
logic::retry_after_gate_open(&cv, now),
now_secs < deadline_secs
);
}
#[hegel::test]
fn gate_closed_without_retry_after(tc: hegel::TestCase) {
let mut cv = tc.draw(cache_values());
cv.retry_after = None;
let now = tc.draw(jiff_gs::timestamps());
assert!(!logic::retry_after_gate_open(&cv, now));
}
#[hegel::test]
fn conditional_headers_project_cache_fields(tc: hegel::TestCase) {
let cv = tc.draw(generators::optional(cache_values()));
let headers = logic::conditional_headers(cv.as_ref());
assert_eq!(
headers.if_modified_since,
cv.as_ref().and_then(|c| c.last_modified.clone())
);
assert_eq!(
headers.if_none_match,
cv.as_ref().and_then(|c| c.etag.clone())
);
}
#[hegel::test]
fn disposition_success_stores_response(tc: hegel::TestCase) {
let code = tc.draw(generators::integers::<u16>().min_value(200).max_value(299));
let status = StatusCode::from_u16(code).expect("2xx is valid");
let etag = tc.draw(generators::optional(generators::text()));
let last_modified = tc.draw(generators::optional(generators::text()));
let body = tc.draw(generators::optional(generators::text()));
let had_cache_entry = tc.draw(generators::booleans());
let disp = logic::disposition(
status,
etag.as_deref(),
last_modified.as_deref(),
body.clone(),
had_cache_entry,
None,
);
let logic::Disposition::Store {
etag: e,
last_modified: lm,
body: b,
} = disp
else {
panic!("expected Store, got {disp:?}");
};
assert_eq!(e, etag);
assert_eq!(lm, last_modified);
assert_eq!(b, body);
}
#[hegel::test]
fn disposition_not_modified_depends_on_cache(tc: hegel::TestCase) {
let body = tc.draw(generators::optional(generators::text()));
let had_cache_entry = tc.draw(generators::booleans());
let disp = logic::disposition(
StatusCode::NOT_MODIFIED,
None,
None,
body.clone(),
had_cache_entry,
None,
);
if had_cache_entry {
assert!(matches!(disp, logic::Disposition::Reuse));
} else {
let logic::Disposition::Store { body: b, .. } = disp else {
panic!("expected Store, got {disp:?}");
};
assert_eq!(b, body);
}
}
#[hegel::test]
fn disposition_too_many_requests_depends_on_cache(tc: hegel::TestCase) {
let retry_after = tc.draw(generators::optional(generators::text()));
let had_cache_entry = tc.draw(generators::booleans());
let disp = logic::disposition(
StatusCode::TOO_MANY_REQUESTS,
None,
None,
None,
had_cache_entry,
retry_after.as_deref(),
);
if had_cache_entry {
let logic::Disposition::RateLimited { retry_after: span } = disp else {
panic!("expected RateLimited, got {disp:?}");
};
assert_eq!(
span.fieldwise(),
logic::parse_retry_after(retry_after.as_deref()).fieldwise()
);
} else {
assert!(matches!(disp, logic::Disposition::RateLimitedNoCache));
}
}
#[hegel::test]
fn disposition_other_status_is_unexpected(tc: hegel::TestCase) {
let code = tc.draw(generators::integers::<u16>().min_value(100).max_value(599));
tc.assume(!(200..=299).contains(&code) && code != 304 && code != 429);
let status = StatusCode::from_u16(code).expect("100..=599 is valid");
let had_cache_entry = tc.draw(generators::booleans());
let disp = logic::disposition(status, None, None, None, had_cache_entry, None);
let logic::Disposition::Unexpected { status: s } = disp else {
panic!("expected Unexpected, got {disp:?}");
};
assert_eq!(s, status.as_str());
}
#[tokio::test]
async fn sends_conditional_headers_and_reuses_on_304() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(304))
.mount(&server)
.await;
let url = Url::parse(&server.uri()).unwrap();
let cache = Arc::new(Cache::new());
let etag = normalize_etag("abc123");
let last_modified = "Mon, 01 Jan 2024 00:00:00 GMT".to_string();
cache.insert(
url.clone(),
CacheValue {
timestamp: Timestamp::now(),
retry_after: None,
last_modified: Some(last_modified.clone()),
etag: Some(etag.clone()),
body: Some(get_valid_rss_feed("cached")),
},
);
let feed = url.fetch_feed(&cache).await.expect("served cache on 304");
assert!(
feed.title
.as_ref()
.is_some_and(|t| t.content.contains("cached"))
);
let received = server.received_requests().await.unwrap();
let req = &received[0];
assert_eq!(
req.headers.get("if-none-match").unwrap().to_str().unwrap(),
etag
);
assert_eq!(
req.headers
.get("if-modified-since")
.unwrap()
.to_str()
.unwrap(),
last_modified
);
}
#[tokio::test]
async fn stores_etag_and_last_modified_on_200() {
let server = MockServer::start().await;
let etag_raw = "feed-etag";
let last_modified = "Mon, 01 Jan 2024 00:00:00 GMT";
Mock::given(method("GET"))
.and(path("/"))
.respond_with(
ResponseTemplate::new(200)
.append_header("etag", etag_raw)
.append_header("last-modified", last_modified)
.set_body_string(get_valid_rss_feed("fresh")),
)
.mount(&server)
.await;
let url = Url::parse(&server.uri()).unwrap();
let cache = Arc::new(Cache::new());
let feed = url.fetch_feed(&cache).await.expect("fetched fresh feed");
assert!(
feed.title
.as_ref()
.is_some_and(|t| t.content.contains("fresh"))
);
let entry = cache.get(&url).expect("cached after 200");
assert_eq!(
entry.etag.as_deref(),
Some(normalize_etag(etag_raw).as_str())
);
assert_eq!(entry.last_modified.as_deref(), Some(last_modified));
}
#[tokio::test]
async fn rate_limited_serves_cache_and_records_retry_after() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(429).insert_header("retry-after", "120"))
.mount(&server)
.await;
let url = Url::parse(&server.uri()).unwrap();
let cache = Arc::new(Cache::new());
cache.insert(
url.clone(),
CacheValue {
timestamp: Timestamp::now(),
retry_after: None,
last_modified: None,
etag: None,
body: Some(get_valid_rss_feed("rate-limited")),
},
);
let feed = url.fetch_feed(&cache).await.expect("served cache on 429");
assert!(
feed.title
.as_ref()
.is_some_and(|t| t.content.contains("rate-limited"))
);
let entry = cache.get(&url).expect("entry present");
assert_eq!(entry.retry_after.unwrap().get_seconds(), 120);
}
#[tokio::test]
async fn unexpected_status_is_an_error() {
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/"))
.respond_with(ResponseTemplate::new(500))
.mount(&server)
.await;
let url = Url::parse(&server.uri()).unwrap();
let cache = Arc::new(Cache::new());
let res = url.fetch_feed(&cache).await;
assert!(matches!(
res,
Err(OpenringError::UnexpectedStatusError { .. })
));
}
}