crawlberg 1.0.1

High-performance web crawling engine
Documentation
//! Builder for [`CrawlEngine`].

use std::sync::Arc;

use crate::defaults;
use crate::error::CrawlError;
#[cfg(not(target_arch = "wasm32"))]
use crate::sink::EventSink;
use crate::traits::*;
use crate::types::*;

use super::CrawlEngine;

/// Builder for [`CrawlEngine`].
///
/// Any field left unset will be filled with a default implementation
/// from the crate's internal `defaults` module.
///
/// # Pool injection
///
/// For long-lived processes (e.g. a worker service that handles many jobs), construct
/// the browser pool(s) once at startup and inject them via the builder methods rather
/// than relying on per-engine pool construction:
///
/// ```rust,ignore
/// use crawlberg::{BrowserPool, BrowserPoolConfig, CrawlEngine};
///
/// let pool = BrowserPool::new(BrowserPoolConfig::default());
/// pool.warm().await?;
///
/// let engine = CrawlEngine::builder()
///     .with_browser_pool(pool)
///     .build()?;
/// // The engine reuses the same Chrome instance across all crawls.
/// ```
pub struct CrawlEngineBuilder {
    config: Option<CrawlConfig>,
    frontier: Option<Arc<dyn Frontier>>,
    rate_limiter: Option<Arc<dyn RateLimiter>>,
    store: Option<Arc<dyn CrawlStore>>,
    event_emitter: Option<Arc<dyn EventEmitter>>,
    strategy: Option<Arc<dyn CrawlStrategy>>,
    content_filter: Option<Arc<dyn ContentFilter>>,
    cache: Option<Arc<dyn CrawlCache>>,
    #[cfg(not(target_arch = "wasm32"))]
    event_sink: Option<Arc<dyn EventSink>>,
    page_budget: Option<Arc<dyn crate::budget::PageBudget>>,
    #[cfg(feature = "browser")]
    browser_pool: Option<Arc<crate::browser_pool::BrowserPool>>,
    #[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
    native_executor: Option<Arc<crawlberg_browser::adapter::NativeBrowserExecutor>>,
    proxy_provider: Option<Arc<dyn crate::ProxyProvider>>,
}

impl CrawlEngineBuilder {
    /// Create a new builder with no fields set.
    pub fn new() -> Self {
        Self {
            config: None,
            frontier: None,
            rate_limiter: None,
            store: None,
            event_emitter: None,
            strategy: None,
            content_filter: None,
            cache: None,
            #[cfg(not(target_arch = "wasm32"))]
            event_sink: None,
            page_budget: None,
            #[cfg(feature = "browser")]
            browser_pool: None,
            #[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
            native_executor: None,
            proxy_provider: None,
        }
    }

    /// Set the crawl configuration.
    pub fn config(mut self, config: CrawlConfig) -> Self {
        self.config = Some(config);
        self
    }

    /// Set the frontier implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn frontier(mut self, frontier: impl Frontier + 'static) -> Self {
        self.frontier = Some(Arc::new(frontier));
        self
    }

    /// Set the rate limiter implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn rate_limiter(mut self, rate_limiter: impl RateLimiter + 'static) -> Self {
        self.rate_limiter = Some(Arc::new(rate_limiter));
        self
    }

    /// Set the store implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn store(mut self, store: impl CrawlStore + 'static) -> Self {
        self.store = Some(Arc::new(store));
        self
    }

    /// Set the event emitter implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn event_emitter(mut self, event_emitter: impl EventEmitter + 'static) -> Self {
        self.event_emitter = Some(Arc::new(event_emitter));
        self
    }

    /// Set the crawl strategy implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn strategy(mut self, strategy: impl CrawlStrategy + 'static) -> Self {
        self.strategy = Some(Arc::new(strategy));
        self
    }

    /// Set the content filter implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn content_filter(mut self, content_filter: impl ContentFilter + 'static) -> Self {
        self.content_filter = Some(Arc::new(content_filter));
        self
    }

    /// Set the persistent cache implementation.
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn cache(mut self, cache: impl CrawlCache + 'static) -> Self {
        self.cache = Some(Arc::new(cache));
        self
    }

    /// Set the event sink for streaming crawl events.
    ///
    /// The event sink receives [`CrawlEvent`]s as pages are processed, allowing
    /// consumers to integrate with external systems (NATS, dashboards, analytics, etc.)
    /// without crawlberg depending on those backends.
    ///
    /// [`CrawlEvent`]: crate::CrawlEvent
    #[cfg(not(target_arch = "wasm32"))]
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn event_sink(mut self, event_sink: impl EventSink + 'static) -> Self {
        self.event_sink = Some(Arc::new(event_sink));
        self
    }

    /// Set the page budget hook for controlling crawl extent.
    ///
    /// The page budget is consulted before each page fetch. Returning
    /// `Err(BudgetError::Exhausted)` halts the crawl gracefully.
    ///
    /// Defaults to [`DefaultPageBudget`] if not set.
    ///
    /// [`DefaultPageBudget`]: crate::budget::DefaultPageBudget
    #[allow(dead_code)]
    #[cfg_attr(alef, alef(skip))]
    pub fn page_budget(mut self, page_budget: impl crate::budget::PageBudget + 'static) -> Self {
        self.page_budget = Some(Arc::new(page_budget));
        self
    }

    /// Inject a pre-built [`BrowserPool`] for chromiumoxide-backed browser fetches.
    ///
    /// When set, the engine reuses this pool across all crawl operations rather than
    /// constructing a new pool per engine or per request. Intended for long-lived
    /// worker processes that need to amortise Chrome startup cost.
    ///
    /// The injected pool takes precedence over any pool stored in `CrawlConfig.browser_pool`.
    ///
    /// [`BrowserPool`]: crate::browser_pool::BrowserPool
    #[cfg(feature = "browser")]
    #[cfg_attr(alef, alef(skip))]
    pub fn with_browser_pool(mut self, pool: Arc<crate::browser_pool::BrowserPool>) -> Self {
        self.browser_pool = Some(pool);
        self
    }

    /// Inject a pre-built [`NativeBrowserExecutor`] for native-backend browser fetches.
    ///
    /// When set, the engine reuses this executor across all crawl operations rather than
    /// constructing a new thread-pool per engine. Intended for long-lived worker processes
    /// that need to amortise native browser worker startup cost.
    ///
    /// The injected executor takes precedence over the one constructed from config.
    ///
    /// [`NativeBrowserExecutor`]: crawlberg_browser::adapter::NativeBrowserExecutor
    #[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
    #[cfg_attr(alef, alef(skip))]
    pub fn with_native_executor(mut self, executor: Arc<crawlberg_browser::adapter::NativeBrowserExecutor>) -> Self {
        self.native_executor = Some(executor);
        self
    }

    /// Inject a [`crate::ProxyProvider`] for per-request proxy rotation on the
    /// reqwest HTTP path. Stored on the resolved [`CrawlConfig`] as
    /// `proxy_provider`; takes precedence over the static
    /// `CrawlConfig::proxy` value when both are set.
    ///
    /// Browser-backend proxies (`CrawlConfig::browser::proxy`) still read the
    /// static `ProxyConfig` value — provider rotation only applies to the HTTP
    /// fetcher.
    #[cfg_attr(alef, alef(skip))]
    pub fn with_proxy_provider(mut self, provider: Arc<dyn crate::ProxyProvider>) -> Self {
        self.proxy_provider = Some(provider);
        self
    }

    /// Build the [`CrawlEngine`] with the configured options.
    ///
    /// Config validation is deferred to the first operation (scrape, crawl, etc.) so that
    /// the engine can always be constructed and individual operations report validation errors.
    pub fn build(self) -> Result<CrawlEngine, CrawlError> {
        // `config` needs to be mutable only when the `browser` feature is active
        // (to inject `browser_pool`); suppress the warning on other feature combinations.
        #[allow(unused_mut)]
        let mut config = self.config.unwrap_or_default();

        // Apply the injected browser pool to the config so the engine's fetch paths
        // pick it up from `config.browser_pool`. The builder field takes precedence
        // over any pool that was already embedded in the config.
        #[cfg(feature = "browser")]
        if let Some(pool) = self.browser_pool {
            config.browser_pool = Some(pool);
        }

        if let Some(provider) = self.proxy_provider {
            config.proxy_provider = Some(provider);
        }

        // CRAWLBERG_ALLOW_PRIVATE_NETWORK is an operator-side override that must
        // win regardless of how the caller built `config.ssrf`. Several
        // alef-generated bindings (Elixir NIF, PHP, WASM, Ruby) fall back to
        // `SsrfPolicy::default()` when their host-side `ssrf` field is absent,
        // hardcoding `deny_private: true` and silently overriding the env var
        // set by e2e harnesses / docker-compose. Applying the override at
        // engine-construction time keeps `SsrfPolicy::default()` clean and
        // makes the env var the single source of truth.
        if std::env::var("CRAWLBERG_ALLOW_PRIVATE_NETWORK")
            .map(|v| v.to_lowercase())
            .ok()
            .is_some_and(|v| v == "1" || v == "true")
        {
            config.ssrf.deny_private = false;
        }

        // An empty scheme_allowlist is never meaningful caller intent through any
        // binding: `scheme_allowlist` is `#[serde(skip)]` / `#[alef(skip)]`, so no
        // binding can populate it deliberately. Empty universally means the
        // construction path (e.g. `Default::default()` in generated FFI glue) did
        // not fill it. Treat empty as "use the default allowlist" — symmetric with
        // the deny_private env override above. Without this, every HTTP/HTTPS
        // request fails with `disallowed scheme: http`.
        if config.ssrf.scheme_allowlist.is_empty() {
            config.ssrf.scheme_allowlist = crate::net::ssrf::default_scheme_allowlist();
        }

        let rate_limit_ms = config.rate_limit_ms.unwrap_or(200);
        #[cfg(not(target_arch = "wasm32"))]
        let ua_rotation = crate::tower::UaRotationLayer::new(config.user_agents.clone());

        // Native executor: use the injected one when present; otherwise build from config.
        #[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
        let native_browser_executor = if let Some(executor) = self.native_executor {
            // Backend may not be Native in config yet, but the caller explicitly provided
            // an executor — honour it regardless so callers can pre-build and inject.
            Some(executor)
        } else {
            build_native_browser_executor(&config)?
        };

        Ok(CrawlEngine {
            config,
            frontier: self
                .frontier
                .unwrap_or_else(|| Arc::new(defaults::InMemoryFrontier::new())),
            rate_limiter: self.rate_limiter.unwrap_or_else(|| {
                Arc::new(defaults::PerDomainThrottle::new(std::time::Duration::from_millis(
                    rate_limit_ms,
                )))
            }),
            store: self.store.unwrap_or_else(|| Arc::new(defaults::NoopStore)),
            event_emitter: self.event_emitter.unwrap_or_else(|| Arc::new(defaults::NoopEmitter)),
            strategy: self.strategy.unwrap_or_else(|| Arc::new(defaults::BfsStrategy)),
            content_filter: self.content_filter.unwrap_or_else(|| Arc::new(defaults::NoopFilter)),
            cache: self.cache.unwrap_or_else(|| Arc::new(defaults::NoopCache)),
            #[cfg(not(target_arch = "wasm32"))]
            event_sink: self.event_sink,
            page_budget: self
                .page_budget
                .unwrap_or_else(|| Arc::new(crate::budget::DefaultPageBudget)),
            #[cfg(not(target_arch = "wasm32"))]
            ua_rotation,
            #[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
            native_browser_executor,
        })
    }
}

#[cfg(all(not(target_arch = "wasm32"), feature = "browser-native"))]
fn build_native_browser_executor(
    config: &CrawlConfig,
) -> Result<Option<Arc<crawlberg_browser::adapter::NativeBrowserExecutor>>, CrawlError> {
    if config.browser.backend != BrowserBackend::Native {
        return Ok(None);
    }

    let executor_config = match config.max_concurrent {
        Some(workers) if workers > 0 => crawlberg_browser::adapter::NativeBrowserExecutorConfig::with_workers(workers),
        _ => crawlberg_browser::adapter::NativeBrowserExecutorConfig::default(),
    };
    let executor = crawlberg_browser::adapter::NativeBrowserExecutor::new(executor_config)
        .map_err(|e| CrawlError::BrowserError(format!("failed to start native browser executor: {e}")))?;
    Ok(Some(Arc::new(executor)))
}

impl Default for CrawlEngineBuilder {
    fn default() -> Self {
        Self::new()
    }
}