use jiff::Timestamp;
use sha2::{Digest, Sha256};
use url::Url;
use super::FetcherError;
use super::fetch::ConditionalGet;
use super::ssrf::SsrfLevel;
use super::ttl::{TtlDecision, compute_ttl};
use crate::config::CacheConfig;
use crate::extractor::metadata::ExtractedMetadata;
use crate::storage::Db;
use crate::storage::pages::{self, Page, url_hash};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CacheStatus {
Hit,
Stale {
revalidation_task_id: Option<String>,
},
Miss,
}
#[derive(Debug, Clone)]
pub struct CachedFetch {
pub page: Page,
pub cache_status: CacheStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HeadlessMode {
#[default]
Off,
On,
Auto,
}
#[derive(Debug, Clone)]
pub struct FetchOptions {
pub force_refresh: bool,
pub ssrf_level: SsrfLevel,
pub ssrf_project_root: Option<std::path::PathBuf>,
pub har_recorder: Option<std::sync::Arc<crate::fetcher::har::HarRecorder>>,
pub ignore_robots: bool,
pub user_agent: String,
#[cfg(feature = "headless")]
pub headless: Option<std::sync::Arc<crate::fetcher::headless::HeadlessRenderer>>,
pub headless_mode: HeadlessMode,
pub synchronous_revalidation: bool,
}
#[derive(Debug, Clone)]
pub struct ExtractResult {
pub title: Option<String>,
pub body_md: String,
pub content_hash: String,
pub metadata: ExtractedMetadata,
}
#[allow(clippy::too_many_arguments)]
pub async fn fetch_with_cache<F>(
db: &Db,
client: &reqwest::Client,
pacer: &crate::fetcher::concurrency::Pacer,
rate_cfg: &crate::config::RateLimitConfig,
robots_cfg: &crate::config::RobotsConfig,
url: &Url,
cache_cfg: &CacheConfig,
opts: FetchOptions,
mut extract_fn: F,
) -> Result<CachedFetch, FetcherError>
where
F: FnMut(&str, &Url) -> Result<ExtractResult, FetcherError>,
{
let now = Timestamp::now().as_second();
let host = url
.host_str()
.ok_or(FetcherError::Ssrf(crate::fetcher::ssrf::SsrfError::NoHost))?;
let robots_skipped = !robots_cfg.respect
|| opts.ignore_robots
|| robots_cfg.ignore_domains.iter().any(|d| d == host);
let crawl_delay: Option<std::time::Duration> = if robots_skipped {
None
} else {
let entry = crate::fetcher::robots::ensure_entry(
db,
pacer,
client,
robots_cfg,
host,
opts.ssrf_level,
&opts.user_agent,
rate_cfg,
)
.await?;
let verdict = crate::fetcher::robots::evaluate(&entry, &opts.user_agent, url.path());
if matches!(verdict, crate::fetcher::robots::Verdict::Disallowed) {
return Err(FetcherError::RobotsDisallowed {
url: url.to_string(),
ua: opts.user_agent.clone(),
});
}
crate::fetcher::robots::crawl_delay(&entry, &opts.user_agent)
};
let swr_window_secs = cache_cfg.stale_while_revalidate_window.as_secs() as i64;
let stale: Option<Page> = if opts.force_refresh {
None
} else {
match lookup_cached(db, url).await? {
Some(p) if p.expires_at.is_some_and(|e| e > now) => {
return Ok(CachedFetch {
page: p,
cache_status: CacheStatus::Hit,
});
}
Some(p) => {
let within_swr_window = p
.expires_at
.is_some_and(|e| now.saturating_sub(e) <= swr_window_secs);
if within_swr_window && !opts.synchronous_revalidation {
let task_id = insert_revalidate_task(db, url, &p).await;
return Ok(CachedFetch {
page: p,
cache_status: CacheStatus::Stale {
revalidation_task_id: task_id,
},
});
}
Some(p)
}
None => None,
}
};
let cond = match &stale {
Some(p) => ConditionalGet {
if_none_match: p.etag.clone(),
if_modified_since: p.last_modified.clone(),
},
None => ConditionalGet::default(),
};
let fetched = match opts.headless_mode {
HeadlessMode::Off | HeadlessMode::Auto => {
match crate::fetcher::retry::with_retries(
db,
pacer,
client,
url,
opts.ssrf_level,
opts.ssrf_project_root.as_deref(),
opts.har_recorder.as_ref(),
&cond,
crawl_delay,
rate_cfg,
)
.await
{
Ok(f) => f,
Err(e) => {
if let Some(s) = stale {
let within_window = s
.expires_at
.is_some_and(|exp| now.saturating_sub(exp) <= swr_window_secs);
if within_window {
tracing::warn!(target: "rover::fetcher::cached",
error = %e, url = url.as_str(), "fetch failed; serving stale within SWR window");
let task_id = insert_revalidate_task(db, url, &s).await;
return Ok(CachedFetch {
page: s,
cache_status: CacheStatus::Stale {
revalidation_task_id: task_id,
},
});
}
tracing::warn!(target: "rover::fetcher::cached",
error = %e, url = url.as_str(),
"fetch failed; stale entry is beyond SWR window — propagating error rather than serving very old content");
}
return Err(e);
}
}
}
HeadlessMode::On => {
#[cfg(not(feature = "headless"))]
{
return Err(FetcherError::HeadlessFeatureNotCompiled);
}
#[cfg(feature = "headless")]
{
let r = opts
.headless
.as_ref()
.ok_or(FetcherError::HeadlessRendererUnavailable)?;
let rendered = r
.render(url, opts.ssrf_level, opts.ssrf_project_root.as_deref())
.await?;
rendered_to_fetched(rendered)
}
}
};
if fetched.status == 304 {
let stale = stale.expect("304 implies a stale entry was sent");
let decision = compute_ttl(
now,
host,
fetched.cache_control.as_deref().unwrap_or(""),
fetched.expires.as_deref(),
cache_cfg,
);
let expires_at = match decision {
TtlDecision::Cache { expires_at } => Some(expires_at),
TtlDecision::DoNotCache => None,
};
pages::touch(db, &stale.url_hash, now, expires_at)
.await
.map_err(map_storage_err)?;
let mut page = stale;
page.fetched_at = now;
page.expires_at = expires_at;
return Ok(CachedFetch {
page,
cache_status: CacheStatus::Hit,
});
}
if !(200..300).contains(&fetched.status) {
return Err(FetcherError::Status {
status: fetched.status,
url: fetched.final_url.to_string(),
});
}
let extracted = extract_fn(&fetched.body, &fetched.final_url)?;
let (fetched, extracted) = if opts.headless_mode == HeadlessMode::Auto {
#[cfg(feature = "headless")]
{
if let Some(r) = opts.headless.as_ref() {
let hits =
crate::fetcher::headless::detect::detect_spa(&fetched.body, &extracted.body_md);
if hits.total >= 2 {
let rendered = r
.render(url, opts.ssrf_level, opts.ssrf_project_root.as_deref())
.await?;
let f2 = rendered_to_fetched(rendered);
let e2 = extract_fn(&f2.body, &f2.final_url)?;
(f2, e2)
} else {
(fetched, extracted)
}
} else {
(fetched, extracted)
}
}
#[cfg(not(feature = "headless"))]
{
(fetched, extracted)
}
} else {
(fetched, extracted)
};
let decision = compute_ttl(
now,
host,
fetched.cache_control.as_deref().unwrap_or(""),
fetched.expires.as_deref(),
cache_cfg,
);
let expires_at = match decision {
TtlDecision::Cache { expires_at } => Some(expires_at),
TtlDecision::DoNotCache => None,
};
let new_hash = url_hash(fetched.canonical_url.as_str());
let metadata_json = serde_json::to_string(&extracted.metadata).ok();
let raw_html = if cache_cfg.store_raw_html {
Some(fetched.body.as_bytes().to_vec())
} else {
None
};
let page = Page {
url_hash: new_hash,
url: url.as_str().to_owned(),
canonical_url: fetched.canonical_url.as_str().to_owned(),
title: extracted.title.clone(),
fetched_at: now,
expires_at,
etag: fetched.etag.clone(),
last_modified: fetched.last_modified.clone(),
content_hash: extracted.content_hash.clone(),
extracted_md: extracted.body_md.clone(),
metadata_json,
raw_html,
};
if expires_at.is_some() {
pages::upsert(db, page.clone())
.await
.map_err(map_storage_err)?;
}
Ok(CachedFetch {
page,
cache_status: CacheStatus::Miss,
})
}
#[cfg(feature = "headless")]
fn rendered_to_fetched(
rendered: crate::fetcher::headless::RenderedPage,
) -> crate::fetcher::FetchedPage {
use crate::fetcher::FetchedPage;
use crate::fetcher::canonical::extract_canonical_url;
use crate::fetcher::charset::Detected;
let canonical_url = extract_canonical_url(&rendered.html, &rendered.final_url, None);
FetchedPage {
final_url: rendered.final_url,
canonical_url,
status: rendered.status,
content_type: Some("text/html; charset=utf-8".to_string()),
body: rendered.html,
charset: Detected::default(),
link_header: None,
etag: None,
last_modified: None,
cache_control: None,
expires: None,
retry_after: None,
}
}
pub fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
let out = h.finalize();
let mut s = String::with_capacity(out.len() * 2);
for b in out {
use std::fmt::Write as _;
write!(s, "{b:02x}").expect("write to String never fails");
}
s
}
async fn lookup_cached(db: &Db, url: &Url) -> Result<Option<Page>, FetcherError> {
let hash = url_hash(url.as_str());
if let Some(p) = pages::get_by_url_hash(db, &hash)
.await
.map_err(map_storage_err)?
{
return Ok(Some(p));
}
pages::get_by_url(db, url.as_str())
.await
.map_err(map_storage_err)
}
fn map_storage_err(e: crate::storage::StorageError) -> FetcherError {
tracing::error!(target: "rover::fetcher::cached", error = %e, "storage error");
FetcherError::Storage(e)
}
async fn insert_revalidate_task(db: &Db, url: &Url, stale: &Page) -> Option<String> {
use crate::storage::tasks::{TaskInsert, TaskKind, insert};
let params = serde_json::to_string(&crate::tasks::types::RevalidateParams {
url: url.to_string(),
etag_at_serve: stale.etag.clone(),
last_modified_at_serve: stale.last_modified.clone(),
})
.ok()?;
let id = uuid::Uuid::now_v7().to_string();
match insert(
db,
TaskInsert {
id: id.clone(),
kind: TaskKind::Revalidate,
params_json: params,
owner_pid: Some(std::process::id() as i64),
},
)
.await
{
Ok(()) => Some(id),
Err(e) => {
tracing::warn!(
target: "rover::fetcher::cached",
error = %e,
url = url.as_str(),
"failed to enqueue revalidate task; serving stale without revalidation",
);
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cache_status_eq() {
assert_ne!(
CacheStatus::Hit,
CacheStatus::Stale {
revalidation_task_id: None
}
);
}
#[test]
fn map_storage_err_routes_to_storage_variant() {
let storage_err = crate::storage::StorageError::from(rusqlite::Error::QueryReturnedNoRows);
let mapped = map_storage_err(storage_err);
assert!(matches!(mapped, FetcherError::Storage(_)));
assert!(mapped.to_string().starts_with("storage error:"));
}
#[test]
fn sha256_hex_matches_known() {
assert_eq!(
sha256_hex(b""),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
);
}
#[tokio::test]
async fn cache_hit_within_ttl() {
use crate::config::{RateLimitConfig, RobotsConfig};
use crate::fetcher::concurrency::Pacer;
use crate::storage::Db;
use std::time::Duration;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let db = Db::open(tmp.path().join("rover.db")).await.unwrap();
let url = Url::parse("https://example.com/").unwrap();
let now = Timestamp::now().as_second();
let page = Page {
url_hash: url_hash(url.as_str()),
url: url.to_string(),
canonical_url: url.to_string(),
title: Some("cached".into()),
fetched_at: now - 60,
expires_at: Some(now + 600),
etag: None,
last_modified: None,
content_hash: "x".into(),
extracted_md: "# cached".into(),
metadata_json: None,
raw_html: None,
};
pages::upsert(&db, page.clone()).await.unwrap();
let cache_cfg = CacheConfig {
default_ttl: Duration::from_secs(3600),
min_ttl: Duration::from_secs(60),
max_ttl: Duration::from_secs(86400),
stale_while_revalidate_window: Duration::from_secs(300),
override_no_store: false,
override_no_store_domains: vec![],
store_raw_html: false,
};
let rate_cfg = RateLimitConfig::default();
let robots_cfg = RobotsConfig {
respect: false,
..RobotsConfig::default()
};
let pacer = Pacer::new(&rate_cfg);
let client = super::super::client::build_http_client("test/0.1", Duration::from_secs(5));
let result = fetch_with_cache(
&db,
&client,
&pacer,
&rate_cfg,
&robots_cfg,
&url,
&cache_cfg,
FetchOptions {
force_refresh: false,
ssrf_level: SsrfLevel::Strict,
ssrf_project_root: None,
har_recorder: None,
ignore_robots: false,
user_agent: "test/0.1".into(),
#[cfg(feature = "headless")]
headless: None,
headless_mode: HeadlessMode::Off,
synchronous_revalidation: false,
},
|_, _| {
panic!("extract_fn must not be called on cache hit");
},
)
.await
.unwrap();
assert_eq!(result.cache_status, CacheStatus::Hit);
assert_eq!(result.page.title.as_deref(), Some("cached"));
}
#[cfg(any(test, feature = "test-loopback"))]
async fn build_swr_test_fixture(
swr_window: std::time::Duration,
) -> (
crate::storage::Db,
Url,
crate::config::CacheConfig,
crate::config::RateLimitConfig,
crate::config::RobotsConfig,
crate::fetcher::concurrency::Pacer,
reqwest::Client,
tempfile::TempDir,
) {
use crate::config::{RateLimitConfig, RobotsConfig};
use crate::fetcher::concurrency::Pacer;
use crate::storage::Db;
use std::time::Duration;
let tmp = tempfile::tempdir().unwrap();
let db = Db::open(tmp.path().join("rover.db")).await.unwrap();
let cache_cfg = CacheConfig {
default_ttl: Duration::from_secs(3600),
min_ttl: Duration::from_secs(0),
max_ttl: Duration::from_secs(86400),
stale_while_revalidate_window: swr_window,
override_no_store: false,
override_no_store_domains: vec![],
store_raw_html: false,
};
let rate_cfg = RateLimitConfig::default();
let robots_cfg = RobotsConfig {
respect: false,
..RobotsConfig::default()
};
let pacer = Pacer::new(&rate_cfg);
let client = crate::fetcher::client::build_http_client("test/0.1", Duration::from_secs(5));
let url = Url::parse("https://placeholder.invalid/").unwrap();
(tmp, db, url, cache_cfg, rate_cfg, robots_cfg, pacer, client).into_unzipped()
}
#[cfg(any(test, feature = "test-loopback"))]
trait IntoUnzipped {
type Output;
fn into_unzipped(self) -> Self::Output;
}
#[cfg(any(test, feature = "test-loopback"))]
impl IntoUnzipped
for (
tempfile::TempDir,
crate::storage::Db,
Url,
crate::config::CacheConfig,
crate::config::RateLimitConfig,
crate::config::RobotsConfig,
crate::fetcher::concurrency::Pacer,
reqwest::Client,
)
{
type Output = (
crate::storage::Db,
Url,
crate::config::CacheConfig,
crate::config::RateLimitConfig,
crate::config::RobotsConfig,
crate::fetcher::concurrency::Pacer,
reqwest::Client,
tempfile::TempDir,
);
fn into_unzipped(self) -> Self::Output {
(
self.1, self.2, self.3, self.4, self.5, self.6, self.7, self.0,
)
}
}
#[cfg(any(test, feature = "test-loopback"))]
async fn insert_expired_page(
db: &crate::storage::Db,
url: &Url,
now: i64,
expired_secs_ago: i64,
) {
let page = Page {
url_hash: url_hash(url.as_str()),
url: url.to_string(),
canonical_url: url.to_string(),
title: Some("old".into()),
fetched_at: now - expired_secs_ago - 60,
expires_at: Some(now - expired_secs_ago),
etag: None,
last_modified: None,
content_hash: "old-hash".into(),
extracted_md: "# old".into(),
metadata_json: None,
raw_html: None,
};
pages::upsert(db, page).await.unwrap();
}
fn fetch_opts_with_sync(sync: bool) -> FetchOptions {
FetchOptions {
force_refresh: false,
ssrf_level: SsrfLevel::Loopback,
ssrf_project_root: None,
har_recorder: None,
ignore_robots: false,
user_agent: "test/0.1".into(),
#[cfg(feature = "headless")]
headless: None,
headless_mode: HeadlessMode::Off,
synchronous_revalidation: sync,
}
}
#[cfg(any(test, feature = "test-loopback"))]
#[tokio::test]
async fn expired_within_window_serves_stale_swr() {
use std::time::Duration;
let (db, _placeholder, cache_cfg, rate_cfg, robots_cfg, pacer, client, _tmp) =
build_swr_test_fixture(Duration::from_secs(300)).await;
let url = Url::parse("https://example.com/within-window").unwrap();
let now = Timestamp::now().as_second();
insert_expired_page(&db, &url, now, 10).await;
let result = fetch_with_cache(
&db,
&client,
&pacer,
&rate_cfg,
&robots_cfg,
&url,
&cache_cfg,
fetch_opts_with_sync(false),
|_, _| panic!("extract_fn must not be called on SWR stale-serve"),
)
.await
.expect("SWR path must succeed");
let task_id = match &result.cache_status {
CacheStatus::Stale {
revalidation_task_id,
} => revalidation_task_id
.as_ref()
.expect("SWR path must enqueue a revalidate task"),
other => panic!("expected CacheStatus::Stale, got {other:?}"),
};
let row = crate::storage::tasks::get(&db, task_id)
.await
.unwrap()
.expect("revalidate task row present after SWR fast-path");
assert_eq!(row.kind, crate::storage::tasks::TaskKind::Revalidate);
}
#[cfg(any(test, feature = "test-loopback"))]
#[tokio::test]
async fn expired_beyond_window_falls_through_to_sync_fetch() {
use std::time::Duration;
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("<html><body>fresh content here</body></html>")
.insert_header("content-type", "text/html; charset=utf-8")
.insert_header("cache-control", "max-age=60"),
)
.mount(&server)
.await;
let (db, _placeholder, cache_cfg, rate_cfg, robots_cfg, pacer, client, _tmp) =
build_swr_test_fixture(Duration::from_secs(300)).await;
let url = Url::parse(&format!("{}/x", server.uri())).unwrap();
let now = Timestamp::now().as_second();
insert_expired_page(&db, &url, now, 3600).await;
let result = fetch_with_cache(
&db,
&client,
&pacer,
&rate_cfg,
&robots_cfg,
&url,
&cache_cfg,
fetch_opts_with_sync(false), |_body, _base| {
Ok(ExtractResult {
title: Some("fresh".into()),
body_md: "fresh".into(),
content_hash: "fresh-hash".into(),
metadata: crate::extractor::metadata::ExtractedMetadata::default(),
})
},
)
.await
.expect("beyond-window expired entry must trigger a sync fetch");
assert_eq!(result.cache_status, CacheStatus::Miss);
let row = pages::get_by_url(&db, url.as_str())
.await
.unwrap()
.expect("row present");
assert_eq!(row.content_hash, "fresh-hash");
assert!(row.fetched_at >= now);
assert_eq!(server.received_requests().await.unwrap().len(), 1);
}
#[cfg(any(test, feature = "test-loopback"))]
#[tokio::test]
async fn synchronous_revalidation_bypasses_swr_within_window() {
use std::time::Duration;
use wiremock::matchers::method;
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("<html><body>fresh</body></html>")
.insert_header("content-type", "text/html; charset=utf-8")
.insert_header("cache-control", "max-age=60"),
)
.mount(&server)
.await;
let (db, _placeholder, cache_cfg, rate_cfg, robots_cfg, pacer, client, _tmp) =
build_swr_test_fixture(Duration::from_secs(300)).await;
let url = Url::parse(&format!("{}/y", server.uri())).unwrap();
let now = Timestamp::now().as_second();
insert_expired_page(&db, &url, now, 10).await;
let result = fetch_with_cache(
&db,
&client,
&pacer,
&rate_cfg,
&robots_cfg,
&url,
&cache_cfg,
fetch_opts_with_sync(true), |_body, _base| {
Ok(ExtractResult {
title: Some("fresh".into()),
body_md: "fresh".into(),
content_hash: "fresh-hash".into(),
metadata: crate::extractor::metadata::ExtractedMetadata::default(),
})
},
)
.await
.expect("synchronous opt-out must trigger a sync fetch");
assert_eq!(result.cache_status, CacheStatus::Miss);
let row = pages::get_by_url(&db, url.as_str())
.await
.unwrap()
.expect("row present");
assert!(row.fetched_at >= now);
assert_eq!(server.received_requests().await.unwrap().len(), 1);
}
}