1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4 CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5 resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use crw_search::SearxngClient;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{RwLock, watch};
14use uuid::Uuid;
15
16pub(crate) fn validate_renderer_pin(
27 renderer: Option<RequestedRenderer>,
28 render_js: Option<bool>,
29 state: &AppState,
30) -> CrwResult<()> {
31 let Some(name) = resolve_pinned_renderer(renderer) else {
32 return Ok(());
33 };
34
35 let effective_request = if render_js.is_none() {
41 Some(true)
42 } else {
43 render_js
44 };
45 let effective_render_js =
46 resolve_render_js(effective_request, state.config.renderer.render_js_default);
47
48 if effective_render_js == Some(false) {
49 return Ok(());
50 }
51
52 let available = state.renderer.js_renderer_names();
53 if !available.contains(&name) {
54 return Err(CrwError::InvalidRequest(format!(
55 "renderer '{}' not available; configured renderers: [{}]. \
56 Update server config or omit the 'renderer' field.",
57 name,
58 available.join(", ")
59 )));
60 }
61 Ok(())
62}
63
64pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
66 validate_renderer_pin(req.renderer, req.render_js, state)
67}
68
69pub struct CrawlJob {
71 pub rx: watch::Receiver<CrawlState>,
72 pub created_at: Instant,
73 pub abort_handle: Option<tokio::task::AbortHandle>,
75}
76
77const MAX_CONCURRENT_CRAWLS: usize = 10;
79const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
81
82#[derive(Clone)]
84pub struct AppState {
85 pub config: Arc<AppConfig>,
86 pub renderer: Arc<FallbackRenderer>,
87 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
88 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
89 pub searxng: Option<Arc<SearxngClient>>,
92 pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
96}
97
98impl AppState {
99 pub fn new(config: AppConfig) -> CrwResult<Self> {
100 let proxy = config.crawler.proxy.as_deref();
101 let renderer = FallbackRenderer::new(
102 &config.renderer,
103 &config.crawler.user_agent,
104 proxy,
105 &config.crawler.stealth,
106 )?
107 .with_host_limits(
108 config.crawler.requests_per_second,
109 config.crawler.per_host_max_concurrent,
110 );
111
112 let searxng = if config.search.enabled
113 && let Some(url) = config.search.searxng_url.as_ref()
114 {
115 let http = reqwest::Client::builder()
120 .connect_timeout(Duration::from_secs(5))
121 .build()
122 .map_err(|e| {
123 CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
124 })?;
125 let timeout = Duration::from_millis(config.search.timeout_ms);
126 Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
127 } else {
128 None
129 };
130
131 let url_filter_cfg =
132 crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
133 let m = crw_core::metrics::metrics();
136 m.map_filter_rules_loaded
137 .with_label_values(&["action"])
138 .inc_by(
139 (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
140 + url_filter_cfg.action_params.len()) as u64,
141 );
142 m.map_filter_rules_loaded
143 .with_label_values(&["tracking"])
144 .inc_by(
145 (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
146 + url_filter_cfg.tracking_params.len()) as u64,
147 );
148 m.map_filter_rules_loaded
149 .with_label_values(&["preserve"])
150 .inc_by(
151 (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
152 + url_filter_cfg.preserve_params.len()) as u64,
153 );
154 m.map_filter_rules_loaded
155 .with_label_values(&["host_override"])
156 .inc_by(url_filter_cfg.host_overrides.len() as u64);
157 let url_filter = Some(Arc::new(url_filter_cfg));
158
159 let state = Self {
160 config: Arc::new(config),
161 renderer: Arc::new(renderer),
162 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
163 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
164 searxng,
165 url_filter,
166 };
167
168 let cleanup_state = state.clone();
171 tokio::spawn(async move {
172 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
173 loop {
174 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
175 let mut jobs = cleanup_state.crawl_jobs.write().await;
176 let before = jobs.len();
177 jobs.retain(|_id, job| {
178 let is_done = matches!(
179 job.rx.borrow().status,
180 CrawlStatus::Completed | CrawlStatus::Failed
181 );
182 !is_done || job.created_at.elapsed() < ttl
184 });
185 let removed = before - jobs.len();
186 if removed > 0 {
187 tracing::info!(
188 removed,
189 remaining = jobs.len(),
190 "Cleaned up expired crawl jobs"
191 );
192 }
193 }
194 });
195
196 Ok(state)
197 }
198
199 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
202 let id = Uuid::new_v4();
203 let initial = CrawlState {
204 id,
205 success: true,
206 status: CrawlStatus::InProgress,
207 total: 0,
208 completed: 0,
209 data: vec![],
210 error: None,
211 };
212
213 let (tx, rx) = watch::channel(initial);
214
215 {
216 let mut jobs = self.crawl_jobs.write().await;
217 jobs.insert(
218 id,
219 CrawlJob {
220 rx,
221 created_at: Instant::now(),
222 abort_handle: None,
223 },
224 );
225 }
226
227 let renderer = self.renderer.clone();
228 let max_concurrency = self.config.crawler.max_concurrency;
229 let respect_robots = self.config.crawler.respect_robots_txt;
230 let rps = self.config.crawler.requests_per_second;
231 let user_agent = self.config.crawler.user_agent.clone();
232 let crawl_semaphore = self.crawl_semaphore.clone();
233 let llm_config = self.config.extraction.llm.clone();
234 let proxy = self.config.crawler.proxy.clone();
235 let jitter_factor = self.config.crawler.stealth.jitter_factor;
236 let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
237 let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
238
239 let handle = tokio::spawn(async move {
240 let _permit = match crawl_semaphore.acquire().await {
241 Ok(p) => p,
242 Err(_) => {
243 let _ = tx.send(CrawlState {
244 id,
245 success: false,
246 status: CrawlStatus::Failed,
247 total: 0,
248 completed: 0,
249 data: vec![],
250 error: Some("Server is overloaded, try again later".into()),
251 });
252 return;
253 }
254 };
255 run_crawl(CrawlOptions {
256 id,
257 req,
258 renderer,
259 max_concurrency,
260 respect_robots,
261 requests_per_second: rps,
262 user_agent: &user_agent,
263 state_tx: tx,
264 llm_config: llm_config.as_ref(),
265 proxy,
266 jitter_factor,
267 deadline_ms_per_page,
268 per_host_max_concurrent,
269 })
270 .await;
271 });
272
273 {
275 let mut jobs = self.crawl_jobs.write().await;
276 if let Some(job) = jobs.get_mut(&id) {
277 job.abort_handle = Some(handle.abort_handle());
278 }
279 }
280
281 id
282 }
283}