use crw_core::Deadline;
use crw_core::config::AppConfig;
use crw_core::error::{CrwError, CrwResult};
use crw_core::types::{
CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, ScrapeRequest,
resolve_pinned_renderer, resolve_render_js,
};
use crw_crawl::crawl::{CrawlOptions, run_crawl};
use crw_crawl::single::scrape_url;
use crw_renderer::FallbackRenderer;
use crw_search::SearxngClient;
use futures::stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, watch};
use uuid::Uuid;
pub(crate) fn validate_renderer_pin(
renderer: Option<RequestedRenderer>,
render_js: Option<bool>,
state: &AppState,
) -> CrwResult<()> {
let Some(name) = resolve_pinned_renderer(renderer) else {
return Ok(());
};
let effective_request = if render_js.is_none() {
Some(true)
} else {
render_js
};
let effective_render_js =
resolve_render_js(effective_request, state.config.renderer.render_js_default);
if effective_render_js == Some(false) {
return Ok(());
}
let available = state.renderer.js_renderer_names();
if !available.contains(&name) {
return Err(CrwError::InvalidRequest(format!(
"renderer '{}' not available; configured renderers: [{}]. \
Update server config or omit the 'renderer' field.",
name,
available.join(", ")
)));
}
Ok(())
}
pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
validate_renderer_pin(req.renderer, req.render_js, state)
}
pub struct CrawlJob {
pub rx: watch::Receiver<CrawlState>,
pub created_at: Instant,
pub abort_handle: Option<tokio::task::AbortHandle>,
}
const MAX_CONCURRENT_CRAWLS: usize = 10;
const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExtractStatus {
Processing,
Completed,
Failed,
}
impl ExtractStatus {
pub fn as_str(self) -> &'static str {
match self {
ExtractStatus::Processing => "processing",
ExtractStatus::Completed => "completed",
ExtractStatus::Failed => "failed",
}
}
}
#[derive(Debug, Clone)]
pub struct ExtractRecord {
pub status: ExtractStatus,
pub data: Option<serde_json::Value>,
pub tokens_used: u32,
pub credits_used: u32,
pub error: Option<String>,
pub created_at: Instant,
}
#[derive(Clone)]
pub struct AppState {
pub config: Arc<AppConfig>,
pub renderer: Arc<FallbackRenderer>,
pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
pub extract_jobs: Arc<RwLock<HashMap<Uuid, ExtractRecord>>>,
pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
pub searxng: Option<Arc<SearxngClient>>,
pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
}
impl AppState {
pub fn new(config: AppConfig) -> CrwResult<Self> {
let proxy = config.crawler.proxy.as_deref();
let renderer = FallbackRenderer::new(
&config.renderer,
&config.crawler.user_agent,
proxy,
&config.crawler.stealth,
)?
.with_host_limits(
config.crawler.requests_per_second,
config.crawler.per_host_max_concurrent,
);
let searxng = if config.search.enabled
&& let Some(url) = config.search.searxng_url.as_ref()
{
let http = reqwest::Client::builder()
.connect_timeout(Duration::from_secs(5))
.build()
.map_err(|e| {
CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
})?;
let timeout = Duration::from_millis(config.search.timeout_ms);
Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
} else {
None
};
let url_filter_cfg =
crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
let m = crw_core::metrics::metrics();
m.map_filter_rules_loaded
.with_label_values(&["action"])
.inc_by(
(crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
+ url_filter_cfg.action_params.len()) as u64,
);
m.map_filter_rules_loaded
.with_label_values(&["tracking"])
.inc_by(
(crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
+ url_filter_cfg.tracking_params.len()) as u64,
);
m.map_filter_rules_loaded
.with_label_values(&["preserve"])
.inc_by(
(crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
+ url_filter_cfg.preserve_params.len()) as u64,
);
m.map_filter_rules_loaded
.with_label_values(&["host_override"])
.inc_by(url_filter_cfg.host_overrides.len() as u64);
let url_filter = Some(Arc::new(url_filter_cfg));
let state = Self {
config: Arc::new(config),
renderer: Arc::new(renderer),
crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
extract_jobs: Arc::new(RwLock::new(HashMap::new())),
crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
searxng,
url_filter,
};
let cleanup_state = state.clone();
tokio::spawn(async move {
let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
loop {
tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
let mut jobs = cleanup_state.crawl_jobs.write().await;
let before = jobs.len();
jobs.retain(|_id, job| {
let is_done = matches!(
job.rx.borrow().status,
CrawlStatus::Completed | CrawlStatus::Failed
);
!is_done || job.created_at.elapsed() < ttl
});
let removed = before - jobs.len();
if removed > 0 {
tracing::info!(
removed,
remaining = jobs.len(),
"Cleaned up expired crawl jobs"
);
}
drop(jobs);
let mut ejobs = cleanup_state.extract_jobs.write().await;
ejobs.retain(|_id, rec| {
matches!(rec.status, ExtractStatus::Processing)
|| rec.created_at.elapsed() < ttl
});
}
});
Ok(state)
}
pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
let id = Uuid::new_v4();
let initial = CrawlState {
id,
success: true,
status: CrawlStatus::InProgress,
total: 0,
completed: 0,
data: vec![],
error: None,
};
let (tx, rx) = watch::channel(initial);
{
let mut jobs = self.crawl_jobs.write().await;
jobs.insert(
id,
CrawlJob {
rx,
created_at: Instant::now(),
abort_handle: None,
},
);
}
let renderer = self.renderer.clone();
let max_concurrency = self.config.crawler.max_concurrency;
let respect_robots = self.config.crawler.respect_robots_txt;
let rps = self.config.crawler.requests_per_second;
let user_agent = self.config.crawler.user_agent.clone();
let crawl_semaphore = self.crawl_semaphore.clone();
let llm_config = self.config.extraction.llm.clone();
let proxy = self.config.crawler.proxy.clone();
let jitter_factor = self.config.crawler.stealth.jitter_factor;
let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
let handle = tokio::spawn(async move {
let _permit = match crawl_semaphore.acquire().await {
Ok(p) => p,
Err(_) => {
let _ = tx.send(CrawlState {
id,
success: false,
status: CrawlStatus::Failed,
total: 0,
completed: 0,
data: vec![],
error: Some("Server is overloaded, try again later".into()),
});
return;
}
};
run_crawl(CrawlOptions {
id,
req,
renderer,
max_concurrency,
respect_robots,
requests_per_second: rps,
user_agent: &user_agent,
state_tx: tx,
llm_config: llm_config.as_ref(),
proxy,
jitter_factor,
deadline_ms_per_page,
per_host_max_concurrent,
})
.await;
});
{
let mut jobs = self.crawl_jobs.write().await;
if let Some(job) = jobs.get_mut(&id) {
job.abort_handle = Some(handle.abort_handle());
}
}
id
}
pub async fn start_batch_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
let id = Uuid::new_v4();
let total = urls.len() as u32;
let (tx, rx) = watch::channel(CrawlState {
id,
success: true,
status: CrawlStatus::InProgress,
total,
completed: 0,
data: vec![],
error: None,
});
{
let mut jobs = self.crawl_jobs.write().await;
jobs.insert(
id,
CrawlJob {
rx,
created_at: Instant::now(),
abort_handle: None,
},
);
}
let renderer = self.renderer.clone();
let crawl_semaphore = self.crawl_semaphore.clone();
let config = self.config.clone();
let max_concurrency = config.crawler.max_concurrency.max(1);
let handle = tokio::spawn(async move {
let _permit = match crawl_semaphore.acquire().await {
Ok(p) => p,
Err(_) => {
let _ = tx.send(CrawlState {
id,
success: false,
status: CrawlStatus::Failed,
total,
completed: 0,
data: vec![],
error: Some("Server is overloaded, try again later".into()),
});
return;
}
};
if total == 0 {
let _ = tx.send(CrawlState {
id,
success: true,
status: CrawlStatus::Completed,
total: 0,
completed: 0,
data: vec![],
error: None,
});
return;
}
let user_agent = config.crawler.user_agent.clone();
let default_stealth =
config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
let render_js_default = config.renderer.render_js_default;
let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
let reqs: Vec<ScrapeRequest> = urls
.into_iter()
.map(|u| {
let mut r = template.clone();
r.url = u;
r
})
.collect();
futures::stream::iter(reqs)
.for_each_concurrent(max_concurrency, |req| {
let renderer = renderer.clone();
let config = config.clone();
let user_agent = user_agent.clone();
let tx = tx.clone();
async move {
let deadline = Deadline::from_request_ms(deadline_ms);
let scraped = scrape_url(
&req,
&renderer,
config.extraction.llm.as_ref(),
&config.extraction,
&user_agent,
default_stealth,
render_js_default,
deadline,
)
.await
.ok();
tx.send_modify(|st| {
if let Some(d) = scraped {
st.data.push(d);
}
st.completed += 1;
if st.completed >= total {
st.status = CrawlStatus::Completed;
}
});
}
})
.await;
});
{
let mut jobs = self.crawl_jobs.write().await;
if let Some(job) = jobs.get_mut(&id) {
job.abort_handle = Some(handle.abort_handle());
}
}
id
}
pub async fn start_extract_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
let id = Uuid::new_v4();
{
let mut jobs = self.extract_jobs.write().await;
jobs.insert(
id,
ExtractRecord {
status: ExtractStatus::Processing,
data: None,
tokens_used: 0,
credits_used: 0,
error: None,
created_at: Instant::now(),
},
);
}
let renderer = self.renderer.clone();
let config = self.config.clone();
let extract_jobs = self.extract_jobs.clone();
tokio::spawn(async move {
let user_agent = config.crawler.user_agent.clone();
let default_stealth =
config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
let render_js_default = config.renderer.render_js_default;
let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
let mut merged = serde_json::Map::new();
let mut tokens = 0u32;
let mut credits = 0u32;
let mut last_err: Option<String> = None;
let mut any_ok = false;
for u in urls {
let mut req = template.clone();
req.url = u;
let deadline = Deadline::from_request_ms(deadline_ms);
match scrape_url(
&req,
&renderer,
config.extraction.llm.as_ref(),
&config.extraction,
&user_agent,
default_stealth,
render_js_default,
deadline,
)
.await
{
Ok(d) => {
any_ok = true;
if let Some(serde_json::Value::Object(obj)) = d.json {
for (k, v) in obj {
merged.insert(k, v);
}
}
if let Some(usage) = d.llm_usage {
tokens += usage.total_tokens;
}
credits += if d.credit_cost == 0 { 1 } else { d.credit_cost };
}
Err(e) => last_err = Some(e.to_string()),
}
}
let mut jobs = extract_jobs.write().await;
if let Some(rec) = jobs.get_mut(&id) {
if !any_ok && last_err.is_some() {
rec.status = ExtractStatus::Failed;
rec.error = last_err;
} else {
rec.status = ExtractStatus::Completed;
rec.data = Some(serde_json::Value::Object(merged));
}
rec.tokens_used = tokens;
rec.credits_used = credits.max(1);
}
});
id
}
}