pub mod browser;
pub mod detect;
pub mod intercept;
pub mod third_party;
use thiserror::Error;
use url::Url;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HeadlessMode {
Off,
On,
Auto,
}
impl HeadlessMode {
pub fn as_str(self) -> &'static str {
match self {
HeadlessMode::Off => "off",
HeadlessMode::On => "on",
HeadlessMode::Auto => "auto",
}
}
}
#[derive(Debug, Clone)]
pub struct RenderedPage {
pub final_url: Url,
pub html: String,
pub status: u16,
}
#[derive(Debug, Error)]
pub enum HeadlessError {
#[error("browser launch failed: {0}")]
LaunchFailed(String),
#[error("browser config invalid: {0}")]
ConfigInvalid(String),
#[error("render timeout after {timeout_secs}s on {url}")]
Timeout { url: String, timeout_secs: u32 },
#[error("page closed unexpectedly: {0}")]
PageClosed(String),
#[error("CDP error: {0}")]
Cdp(String),
#[error("renderer semaphore closed")]
SemaphoreClosed,
}
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::task::JoinHandle;
use chromiumoxide::Browser;
use chromiumoxide::cdp::browser_protocol::fetch::{
EnableParams as FetchEnableParams, EventRequestPaused,
};
use chromiumoxide::cdp::browser_protocol::network::{
EnableParams as NetworkEnableParams, EventLoadingFailed, EventLoadingFinished,
EventRequestWillBeSent, RequestId,
};
use futures::StreamExt;
use crate::config::HeadlessConfig;
use crate::fetcher::ssrf::SsrfLevel;
pub struct HeadlessRenderer {
browser: Browser,
handler_task: JoinHandle<()>,
permit: Arc<Semaphore>,
asset_cfg: HeadlessConfig,
_profile_dir: tempfile::TempDir,
}
impl std::fmt::Debug for HeadlessRenderer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HeadlessRenderer")
.field("asset_cfg", &self.asset_cfg)
.finish_non_exhaustive()
}
}
impl HeadlessRenderer {
pub async fn new(cfg: &HeadlessConfig) -> Result<Self, HeadlessError> {
let (browser, handler_task, profile_dir) = browser::launch(cfg).await?;
let permits = if cfg.max_concurrent == 0 {
4
} else {
cfg.max_concurrent
};
Ok(Self {
browser,
handler_task,
permit: Arc::new(Semaphore::new(permits)),
asset_cfg: cfg.clone(),
_profile_dir: profile_dir,
})
}
pub async fn render(
&self,
url: &url::Url,
ssrf_level: SsrfLevel,
ssrf_project_root: Option<&std::path::Path>,
) -> Result<RenderedPage, HeadlessError> {
let _guard = self
.permit
.acquire()
.await
.map_err(|_| HeadlessError::SemaphoreClosed)?;
let page = self
.browser
.new_page("about:blank")
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
page.execute(FetchEnableParams::default())
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let asset_cfg = self.asset_cfg.clone();
let project_root = ssrf_project_root.map(|p| p.to_path_buf());
let level = ssrf_level;
let page_for_intercept = page.clone();
let mut events = page
.event_listener::<EventRequestPaused>()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let intercept_task: JoinHandle<()> = tokio::spawn(async move {
while let Some(event) = events.next().await {
let pr = project_root.as_deref();
let _ = intercept::handle_paused(
&page_for_intercept,
(*event).clone(),
&asset_cfg,
level,
pr,
)
.await;
}
});
let idle_tracker = if self.asset_cfg.default_wait == "networkidle0" {
match NetworkIdleTracker::start(&page).await {
Ok(t) => Some(t),
Err(e) => {
intercept_task.abort();
let _ = page.close().await;
return Err(e);
}
}
} else {
None
};
let url_str = url.to_string();
if let Err(e) = page.goto(url.as_str()).await {
intercept_task.abort();
let _ = page.close().await;
return Err(HeadlessError::Cdp(e.to_string()));
}
let timeout = self.asset_cfg.timeout();
let timeout_secs = timeout.as_secs() as u32;
let wait_result = match &idle_tracker {
Some(tracker) => tracker.wait_until_idle(&page, timeout).await,
None => wait_dom_content_loaded(&page, timeout).await,
};
if wait_result.is_err() {
intercept_task.abort();
let _ = page.close().await;
return Err(HeadlessError::Timeout {
url: url_str,
timeout_secs,
});
}
let html = page
.content()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()));
let final_url = page
.url()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))
.map(|opt| {
opt.and_then(|s| url::Url::parse(&s).ok())
.unwrap_or_else(|| url.clone())
});
intercept_task.abort();
let _ = page.close().await;
let html = html?;
let final_url = final_url?;
Ok(RenderedPage {
final_url,
html,
status: 200,
})
}
pub async fn shutdown(mut self) {
let _ = self.browser.close().await;
let _ = self.browser.wait().await;
self.handler_task.abort();
}
}
#[derive(Clone)]
pub struct HeadlessHandle {
cell: Arc<tokio::sync::OnceCell<Arc<HeadlessRenderer>>>,
cfg: HeadlessConfig,
}
impl std::fmt::Debug for HeadlessHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HeadlessHandle")
.field("initialized", &self.cell.get().is_some())
.finish_non_exhaustive()
}
}
impl HeadlessHandle {
pub fn new(cfg: HeadlessConfig) -> Self {
Self {
cell: Arc::new(tokio::sync::OnceCell::new()),
cfg,
}
}
pub fn with_cell(
cell: Arc<tokio::sync::OnceCell<Arc<HeadlessRenderer>>>,
cfg: HeadlessConfig,
) -> Self {
Self { cell, cfg }
}
pub async fn get(&self) -> Result<Arc<HeadlessRenderer>, HeadlessError> {
let cfg = self.cfg.clone();
self.cell
.get_or_try_init(|| async move { HeadlessRenderer::new(&cfg).await.map(Arc::new) })
.await
.cloned()
}
pub fn is_initialized(&self) -> bool {
self.cell.get().is_some()
}
pub fn launch_delay(&self) -> std::time::Duration {
self.cfg.launch_delay()
}
pub async fn shutdown(self) {
let Self { cell, .. } = self;
match Arc::try_unwrap(cell) {
Ok(cell) => {
if let Some(renderer_arc) = cell.into_inner() {
match Arc::try_unwrap(renderer_arc) {
Ok(renderer) => renderer.shutdown().await,
Err(_still_shared) => {
tracing::warn!(
target: "rover::fetcher::headless",
"headless renderer still has outstanding references at shutdown; skipping explicit close",
);
}
}
}
}
Err(_still_shared) => {
tracing::warn!(
target: "rover::fetcher::headless",
"headless handle still shared at shutdown; skipping explicit close",
);
}
}
}
}
async fn wait_dom_content_loaded(
page: &chromiumoxide::Page,
timeout: Duration,
) -> Result<(), HeadlessError> {
tokio::time::timeout(timeout, page.wait_for_navigation())
.await
.map_err(|_| HeadlessError::Cdp("dom_content_loaded timeout".into()))?
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
Ok(())
}
struct NetworkIdleTracker {
inflight: Arc<StdMutex<HashSet<RequestId>>>,
tasks: Vec<JoinHandle<()>>,
}
impl NetworkIdleTracker {
async fn start(page: &chromiumoxide::Page) -> Result<Self, HeadlessError> {
page.execute(NetworkEnableParams::default())
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let inflight: Arc<StdMutex<HashSet<RequestId>>> = Arc::new(StdMutex::new(HashSet::new()));
let mut will = page
.event_listener::<EventRequestWillBeSent>()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let mut finished = page
.event_listener::<EventLoadingFinished>()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let mut failed = page
.event_listener::<EventLoadingFailed>()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let started = inflight.clone();
let t_will: JoinHandle<()> = tokio::spawn(async move {
while let Some(e) = will.next().await {
started.lock().unwrap().insert(e.request_id.clone());
}
});
let done = inflight.clone();
let t_finished: JoinHandle<()> = tokio::spawn(async move {
while let Some(e) = finished.next().await {
done.lock().unwrap().remove(&e.request_id);
}
});
let errored = inflight.clone();
let t_failed: JoinHandle<()> = tokio::spawn(async move {
while let Some(e) = failed.next().await {
errored.lock().unwrap().remove(&e.request_id);
}
});
Ok(Self {
inflight,
tasks: vec![t_will, t_finished, t_failed],
})
}
async fn wait_until_idle(
&self,
page: &chromiumoxide::Page,
timeout: Duration,
) -> Result<(), HeadlessError> {
use std::time::Instant;
let poll = self.inflight.clone();
let waited = tokio::time::timeout(timeout, async {
page.wait_for_navigation()
.await
.map_err(|e| HeadlessError::Cdp(e.to_string()))?;
let mut quiet_since: Option<Instant> = None;
loop {
let n = poll.lock().unwrap().len();
if n == 0 {
let since = *quiet_since.get_or_insert_with(Instant::now);
if since.elapsed() >= Duration::from_millis(500) {
return Ok::<(), HeadlessError>(());
}
} else {
quiet_since = None;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await;
match waited {
Ok(inner) => inner,
Err(_elapsed) => Err(HeadlessError::Cdp("network idle0 wait timed out".into())),
}
}
}
impl Drop for NetworkIdleTracker {
fn drop(&mut self) {
for t in &self.tasks {
t.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn headless_mode_as_str_round_trips() {
assert_eq!(HeadlessMode::Off.as_str(), "off");
assert_eq!(HeadlessMode::On.as_str(), "on");
assert_eq!(HeadlessMode::Auto.as_str(), "auto");
}
}