use crw_core::config::AppConfig;
use crw_core::error::{CrwError, CrwResult};
use crw_core::types::{
CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
resolve_render_js,
};
use crw_crawl::crawl::{CrawlOptions, run_crawl};
use crw_renderer::FallbackRenderer;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, watch};
use uuid::Uuid;
pub(crate) fn validate_renderer_pin(
renderer: Option<RequestedRenderer>,
render_js: Option<bool>,
state: &AppState,
) -> CrwResult<()> {
let Some(name) = resolve_pinned_renderer(renderer) else {
return Ok(());
};
let effective_request = if render_js.is_none() {
Some(true)
} else {
render_js
};
let effective_render_js =
resolve_render_js(effective_request, state.config.renderer.render_js_default);
if effective_render_js == Some(false) {
return Ok(());
}
let available = state.renderer.js_renderer_names();
if !available.contains(&name) {
return Err(CrwError::InvalidRequest(format!(
"renderer '{}' not available; configured renderers: [{}]. \
Update server config or omit the 'renderer' field.",
name,
available.join(", ")
)));
}
Ok(())
}
pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
validate_renderer_pin(req.renderer, req.render_js, state)
}
pub struct CrawlJob {
pub rx: watch::Receiver<CrawlState>,
pub created_at: Instant,
pub abort_handle: Option<tokio::task::AbortHandle>,
}
const MAX_CONCURRENT_CRAWLS: usize = 10;
const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
#[derive(Clone)]
pub struct AppState {
pub config: Arc<AppConfig>,
pub renderer: Arc<FallbackRenderer>,
pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
}
impl AppState {
pub fn new(config: AppConfig) -> CrwResult<Self> {
let proxy = config.crawler.proxy.as_deref();
let renderer = FallbackRenderer::new(
&config.renderer,
&config.crawler.user_agent,
proxy,
&config.crawler.stealth,
)?
.with_host_limits(
config.crawler.requests_per_second,
config.crawler.per_host_max_concurrent,
);
let state = Self {
config: Arc::new(config),
renderer: Arc::new(renderer),
crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
};
let cleanup_state = state.clone();
tokio::spawn(async move {
let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
loop {
tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
let mut jobs = cleanup_state.crawl_jobs.write().await;
let before = jobs.len();
jobs.retain(|_id, job| {
let is_done = matches!(
job.rx.borrow().status,
CrawlStatus::Completed | CrawlStatus::Failed
);
!is_done || job.created_at.elapsed() < ttl
});
let removed = before - jobs.len();
if removed > 0 {
tracing::info!(
removed,
remaining = jobs.len(),
"Cleaned up expired crawl jobs"
);
}
}
});
Ok(state)
}
pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
let id = Uuid::new_v4();
let initial = CrawlState {
id,
success: true,
status: CrawlStatus::InProgress,
total: 0,
completed: 0,
data: vec![],
error: None,
};
let (tx, rx) = watch::channel(initial);
{
let mut jobs = self.crawl_jobs.write().await;
jobs.insert(
id,
CrawlJob {
rx,
created_at: Instant::now(),
abort_handle: None,
},
);
}
let renderer = self.renderer.clone();
let max_concurrency = self.config.crawler.max_concurrency;
let respect_robots = self.config.crawler.respect_robots_txt;
let rps = self.config.crawler.requests_per_second;
let user_agent = self.config.crawler.user_agent.clone();
let crawl_semaphore = self.crawl_semaphore.clone();
let llm_config = self.config.extraction.llm.clone();
let proxy = self.config.crawler.proxy.clone();
let jitter_factor = self.config.crawler.stealth.jitter_factor;
let deadline_ms_per_page = self.config.request.deadline_ms_default;
let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
let handle = tokio::spawn(async move {
let _permit = match crawl_semaphore.acquire().await {
Ok(p) => p,
Err(_) => {
let _ = tx.send(CrawlState {
id,
success: false,
status: CrawlStatus::Failed,
total: 0,
completed: 0,
data: vec![],
error: Some("Server is overloaded, try again later".into()),
});
return;
}
};
run_crawl(CrawlOptions {
id,
req,
renderer,
max_concurrency,
respect_robots,
requests_per_second: rps,
user_agent: &user_agent,
state_tx: tx,
llm_config: llm_config.as_ref(),
proxy,
jitter_factor,
deadline_ms_per_page,
per_host_max_concurrent,
})
.await;
});
{
let mut jobs = self.crawl_jobs.write().await;
if let Some(job) = jobs.get_mut(&id) {
job.abort_handle = Some(handle.abort_handle());
}
}
id
}
}