1use crw_core::Deadline;
2use crw_core::config::AppConfig;
3use crw_core::error::{CrwError, CrwResult};
4use crw_core::types::{
5 CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, ScrapeRequest,
6 resolve_pinned_renderer, resolve_render_js,
7};
8use crw_crawl::crawl::{CrawlOptions, run_crawl};
9use crw_crawl::single::scrape_url;
10use crw_renderer::FallbackRenderer;
11use crw_search::SearxngClient;
12use futures::stream::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use uuid::Uuid;
18
19pub(crate) fn validate_renderer_pin(
30 renderer: Option<RequestedRenderer>,
31 render_js: Option<bool>,
32 state: &AppState,
33) -> CrwResult<()> {
34 let Some(name) = resolve_pinned_renderer(renderer) else {
35 return Ok(());
36 };
37
38 let effective_request = if render_js.is_none() {
44 Some(true)
45 } else {
46 render_js
47 };
48 let effective_render_js =
49 resolve_render_js(effective_request, state.config.renderer.render_js_default);
50
51 if effective_render_js == Some(false) {
52 return Ok(());
53 }
54
55 let available = state.renderer.js_renderer_names();
56 if !available.contains(&name) {
57 return Err(CrwError::InvalidRequest(format!(
58 "renderer '{}' not available; configured renderers: [{}]. \
59 Update server config or omit the 'renderer' field.",
60 name,
61 available.join(", ")
62 )));
63 }
64 Ok(())
65}
66
67pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
69 validate_renderer_pin(req.renderer, req.render_js, state)
70}
71
72pub struct CrawlJob {
74 pub rx: watch::Receiver<CrawlState>,
75 pub created_at: Instant,
76 pub abort_handle: Option<tokio::task::AbortHandle>,
78}
79
80const MAX_CONCURRENT_CRAWLS: usize = 10;
82const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ExtractStatus {
88 Processing,
89 Completed,
90 Failed,
91}
92
93impl ExtractStatus {
94 pub fn as_str(self) -> &'static str {
95 match self {
96 ExtractStatus::Processing => "processing",
97 ExtractStatus::Completed => "completed",
98 ExtractStatus::Failed => "failed",
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
107pub struct ExtractRecord {
108 pub status: ExtractStatus,
109 pub data: Option<serde_json::Value>,
110 pub tokens_used: u32,
111 pub credits_used: u32,
112 pub error: Option<String>,
113 pub created_at: Instant,
114}
115
116#[derive(Clone)]
118pub struct AppState {
119 pub config: Arc<AppConfig>,
120 pub renderer: Arc<FallbackRenderer>,
121 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
122 pub extract_jobs: Arc<RwLock<HashMap<Uuid, ExtractRecord>>>,
125 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
126 pub searxng: Option<Arc<SearxngClient>>,
129 pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
133}
134
135impl AppState {
136 pub fn new(config: AppConfig) -> CrwResult<Self> {
137 let proxy_rotator = crw_core::ProxyRotator::build(
143 &config.crawler.proxy_list,
144 config.crawler.proxy.as_deref(),
145 config.crawler.proxy_rotation,
146 )
147 .map_err(CrwError::ConfigError)?
148 .map(Arc::new);
149 let renderer = FallbackRenderer::new(
150 &config.renderer,
151 &config.crawler.user_agent,
152 None,
153 &config.crawler.stealth,
154 )?
155 .with_proxy_rotator(proxy_rotator)?
156 .with_host_limits(
157 config.crawler.requests_per_second,
158 config.crawler.per_host_max_concurrent,
159 );
160
161 let searxng = if config.search.enabled
162 && let Some(url) = config.search.searxng_url.as_ref()
163 {
164 let http = reqwest::Client::builder()
169 .connect_timeout(Duration::from_secs(5))
170 .build()
171 .map_err(|e| {
172 CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
173 })?;
174 let timeout = Duration::from_millis(config.search.timeout_ms);
175 Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
176 } else {
177 None
178 };
179
180 let url_filter_cfg =
181 crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
182 let m = crw_core::metrics::metrics();
185 m.map_filter_rules_loaded
186 .with_label_values(&["action"])
187 .inc_by(
188 (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
189 + url_filter_cfg.action_params.len()) as u64,
190 );
191 m.map_filter_rules_loaded
192 .with_label_values(&["tracking"])
193 .inc_by(
194 (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
195 + url_filter_cfg.tracking_params.len()) as u64,
196 );
197 m.map_filter_rules_loaded
198 .with_label_values(&["preserve"])
199 .inc_by(
200 (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
201 + url_filter_cfg.preserve_params.len()) as u64,
202 );
203 m.map_filter_rules_loaded
204 .with_label_values(&["host_override"])
205 .inc_by(url_filter_cfg.host_overrides.len() as u64);
206 let url_filter = Some(Arc::new(url_filter_cfg));
207
208 let state = Self {
209 config: Arc::new(config),
210 renderer: Arc::new(renderer),
211 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
212 extract_jobs: Arc::new(RwLock::new(HashMap::new())),
213 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
214 searxng,
215 url_filter,
216 };
217
218 let cleanup_state = state.clone();
221 tokio::spawn(async move {
222 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
223 loop {
224 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
225 let mut jobs = cleanup_state.crawl_jobs.write().await;
226 let before = jobs.len();
227 jobs.retain(|_id, job| {
228 let is_done = matches!(
229 job.rx.borrow().status,
230 CrawlStatus::Completed | CrawlStatus::Failed
231 );
232 !is_done || job.created_at.elapsed() < ttl
234 });
235 let removed = before - jobs.len();
236 if removed > 0 {
237 tracing::info!(
238 removed,
239 remaining = jobs.len(),
240 "Cleaned up expired crawl jobs"
241 );
242 }
243 drop(jobs);
244
245 let mut ejobs = cleanup_state.extract_jobs.write().await;
247 ejobs.retain(|_id, rec| {
248 matches!(rec.status, ExtractStatus::Processing)
249 || rec.created_at.elapsed() < ttl
250 });
251 }
252 });
253
254 Ok(state)
255 }
256
257 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
260 let id = Uuid::new_v4();
261 let initial = CrawlState {
262 id,
263 success: true,
264 status: CrawlStatus::InProgress,
265 total: 0,
266 completed: 0,
267 data: vec![],
268 error: None,
269 };
270
271 let (tx, rx) = watch::channel(initial);
272
273 {
274 let mut jobs = self.crawl_jobs.write().await;
275 jobs.insert(
276 id,
277 CrawlJob {
278 rx,
279 created_at: Instant::now(),
280 abort_handle: None,
281 },
282 );
283 }
284
285 let renderer = self.renderer.clone();
286 let max_concurrency = self.config.crawler.max_concurrency;
287 let respect_robots = self.config.crawler.respect_robots_txt;
288 let rps = self.config.crawler.requests_per_second;
289 let user_agent = self.config.crawler.user_agent.clone();
290 let crawl_semaphore = self.crawl_semaphore.clone();
291 let llm_config = self.config.extraction.llm.clone();
292 let proxy = self.config.crawler.proxy.clone();
293 let jitter_factor = self.config.crawler.stealth.jitter_factor;
294 let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
295 let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
296
297 let handle = tokio::spawn(async move {
298 let _permit = match crawl_semaphore.acquire().await {
299 Ok(p) => p,
300 Err(_) => {
301 let _ = tx.send(CrawlState {
302 id,
303 success: false,
304 status: CrawlStatus::Failed,
305 total: 0,
306 completed: 0,
307 data: vec![],
308 error: Some("Server is overloaded, try again later".into()),
309 });
310 return;
311 }
312 };
313 run_crawl(CrawlOptions {
314 id,
315 req,
316 renderer,
317 max_concurrency,
318 respect_robots,
319 requests_per_second: rps,
320 user_agent: &user_agent,
321 state_tx: tx,
322 llm_config: llm_config.as_ref(),
323 proxy,
324 jitter_factor,
325 deadline_ms_per_page,
326 per_host_max_concurrent,
327 })
328 .await;
329 });
330
331 {
333 let mut jobs = self.crawl_jobs.write().await;
334 if let Some(job) = jobs.get_mut(&id) {
335 job.abort_handle = Some(handle.abort_handle());
336 }
337 }
338
339 id
340 }
341
342 pub async fn start_batch_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
347 let id = Uuid::new_v4();
348 let total = urls.len() as u32;
349 let (tx, rx) = watch::channel(CrawlState {
350 id,
351 success: true,
352 status: CrawlStatus::InProgress,
353 total,
354 completed: 0,
355 data: vec![],
356 error: None,
357 });
358 {
359 let mut jobs = self.crawl_jobs.write().await;
360 jobs.insert(
361 id,
362 CrawlJob {
363 rx,
364 created_at: Instant::now(),
365 abort_handle: None,
366 },
367 );
368 }
369
370 let renderer = self.renderer.clone();
371 let crawl_semaphore = self.crawl_semaphore.clone();
372 let config = self.config.clone();
373 let max_concurrency = config.crawler.max_concurrency.max(1);
374
375 let handle = tokio::spawn(async move {
376 let _permit = match crawl_semaphore.acquire().await {
377 Ok(p) => p,
378 Err(_) => {
379 let _ = tx.send(CrawlState {
380 id,
381 success: false,
382 status: CrawlStatus::Failed,
383 total,
384 completed: 0,
385 data: vec![],
386 error: Some("Server is overloaded, try again later".into()),
387 });
388 return;
389 }
390 };
391
392 if total == 0 {
393 let _ = tx.send(CrawlState {
394 id,
395 success: true,
396 status: CrawlStatus::Completed,
397 total: 0,
398 completed: 0,
399 data: vec![],
400 error: None,
401 });
402 return;
403 }
404
405 let user_agent = config.crawler.user_agent.clone();
406 let default_stealth =
407 config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
408 let render_js_default = config.renderer.render_js_default;
409 let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
410
411 let reqs: Vec<ScrapeRequest> = urls
412 .into_iter()
413 .map(|u| {
414 let mut r = template.clone();
415 r.url = u;
416 r
417 })
418 .collect();
419
420 futures::stream::iter(reqs)
421 .for_each_concurrent(max_concurrency, |req| {
422 let renderer = renderer.clone();
423 let config = config.clone();
424 let user_agent = user_agent.clone();
425 let tx = tx.clone();
426 async move {
427 let deadline = Deadline::from_request_ms(deadline_ms);
428 let scraped = scrape_url(
429 &req,
430 &renderer,
431 config.extraction.llm.as_ref(),
432 &config.extraction,
433 &user_agent,
434 default_stealth,
435 render_js_default,
436 deadline,
437 )
438 .await
439 .ok();
440 tx.send_modify(|st| {
445 if let Some(d) = scraped {
446 st.data.push(d);
447 }
448 st.completed += 1;
449 if st.completed >= total {
450 st.status = CrawlStatus::Completed;
451 }
452 });
453 }
454 })
455 .await;
456 });
457
458 {
459 let mut jobs = self.crawl_jobs.write().await;
460 if let Some(job) = jobs.get_mut(&id) {
461 job.abort_handle = Some(handle.abort_handle());
462 }
463 }
464
465 id
466 }
467
468 pub async fn start_extract_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
472 let id = Uuid::new_v4();
473 {
474 let mut jobs = self.extract_jobs.write().await;
475 jobs.insert(
476 id,
477 ExtractRecord {
478 status: ExtractStatus::Processing,
479 data: None,
480 tokens_used: 0,
481 credits_used: 0,
482 error: None,
483 created_at: Instant::now(),
484 },
485 );
486 }
487
488 let renderer = self.renderer.clone();
489 let config = self.config.clone();
490 let extract_jobs = self.extract_jobs.clone();
491
492 tokio::spawn(async move {
493 let user_agent = config.crawler.user_agent.clone();
494 let default_stealth =
495 config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
496 let render_js_default = config.renderer.render_js_default;
497 let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
498
499 let mut merged = serde_json::Map::new();
500 let mut tokens = 0u32;
501 let mut credits = 0u32;
502 let mut last_err: Option<String> = None;
503 let mut any_ok = false;
504
505 for u in urls {
506 let mut req = template.clone();
507 req.url = u;
508 let deadline = Deadline::from_request_ms(deadline_ms);
509 match scrape_url(
510 &req,
511 &renderer,
512 config.extraction.llm.as_ref(),
513 &config.extraction,
514 &user_agent,
515 default_stealth,
516 render_js_default,
517 deadline,
518 )
519 .await
520 {
521 Ok(d) => {
522 any_ok = true;
523 if let Some(serde_json::Value::Object(obj)) = d.json {
524 for (k, v) in obj {
525 merged.insert(k, v);
526 }
527 }
528 if let Some(usage) = d.llm_usage {
529 tokens += usage.total_tokens;
530 }
531 credits += if d.credit_cost == 0 { 1 } else { d.credit_cost };
532 }
533 Err(e) => last_err = Some(e.to_string()),
534 }
535 }
536
537 let mut jobs = extract_jobs.write().await;
538 if let Some(rec) = jobs.get_mut(&id) {
539 if !any_ok && last_err.is_some() {
540 rec.status = ExtractStatus::Failed;
541 rec.error = last_err;
542 } else {
543 rec.status = ExtractStatus::Completed;
544 rec.data = Some(serde_json::Value::Object(merged));
545 }
546 rec.tokens_used = tokens;
547 rec.credits_used = credits.max(1);
548 }
549 });
550
551 id
552 }
553}