1use crw_core::Deadline;
2use crw_core::config::AppConfig;
3use crw_core::error::{CrwError, CrwResult};
4use crw_core::types::{
5 CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, ScrapeRequest,
6 resolve_pinned_renderer, resolve_render_js,
7};
8use crw_crawl::crawl::{CrawlOptions, run_crawl};
9use crw_crawl::single::scrape_url;
10use crw_renderer::FallbackRenderer;
11use crw_search::SearxngClient;
12use futures::stream::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use uuid::Uuid;
18
19pub(crate) fn validate_renderer_pin(
30 renderer: Option<RequestedRenderer>,
31 render_js: Option<bool>,
32 state: &AppState,
33) -> CrwResult<()> {
34 let Some(name) = resolve_pinned_renderer(renderer) else {
35 return Ok(());
36 };
37
38 let effective_request = if render_js.is_none() {
44 Some(true)
45 } else {
46 render_js
47 };
48 let effective_render_js =
49 resolve_render_js(effective_request, state.config.renderer.render_js_default);
50
51 if effective_render_js == Some(false) {
52 return Ok(());
53 }
54
55 let available = state.renderer.js_renderer_names();
56 if !available.contains(&name) {
57 return Err(CrwError::InvalidRequest(format!(
58 "renderer '{}' not available; configured renderers: [{}]. \
59 Update server config or omit the 'renderer' field.",
60 name,
61 available.join(", ")
62 )));
63 }
64 Ok(())
65}
66
67pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
69 validate_renderer_pin(req.renderer, req.render_js, state)
70}
71
72pub struct CrawlJob {
74 pub rx: watch::Receiver<CrawlState>,
75 pub created_at: Instant,
76 pub abort_handle: Option<tokio::task::AbortHandle>,
78}
79
80const MAX_CONCURRENT_CRAWLS: usize = 10;
82const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ExtractStatus {
88 Processing,
89 Completed,
90 Failed,
91}
92
93impl ExtractStatus {
94 pub fn as_str(self) -> &'static str {
95 match self {
96 ExtractStatus::Processing => "processing",
97 ExtractStatus::Completed => "completed",
98 ExtractStatus::Failed => "failed",
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
107pub struct ExtractRecord {
108 pub status: ExtractStatus,
109 pub data: Option<serde_json::Value>,
110 pub tokens_used: u32,
111 pub credits_used: u32,
112 pub error: Option<String>,
113 pub created_at: Instant,
114}
115
116#[derive(Clone)]
118pub struct AppState {
119 pub config: Arc<AppConfig>,
120 pub renderer: Arc<FallbackRenderer>,
121 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
122 pub extract_jobs: Arc<RwLock<HashMap<Uuid, ExtractRecord>>>,
125 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
126 pub searxng: Option<Arc<SearxngClient>>,
129 pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
133}
134
135impl AppState {
136 pub fn new(config: AppConfig) -> CrwResult<Self> {
137 let proxy = config.crawler.proxy.as_deref();
138 let renderer = FallbackRenderer::new(
139 &config.renderer,
140 &config.crawler.user_agent,
141 proxy,
142 &config.crawler.stealth,
143 )?
144 .with_host_limits(
145 config.crawler.requests_per_second,
146 config.crawler.per_host_max_concurrent,
147 );
148
149 let searxng = if config.search.enabled
150 && let Some(url) = config.search.searxng_url.as_ref()
151 {
152 let http = reqwest::Client::builder()
157 .connect_timeout(Duration::from_secs(5))
158 .build()
159 .map_err(|e| {
160 CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
161 })?;
162 let timeout = Duration::from_millis(config.search.timeout_ms);
163 Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
164 } else {
165 None
166 };
167
168 let url_filter_cfg =
169 crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
170 let m = crw_core::metrics::metrics();
173 m.map_filter_rules_loaded
174 .with_label_values(&["action"])
175 .inc_by(
176 (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
177 + url_filter_cfg.action_params.len()) as u64,
178 );
179 m.map_filter_rules_loaded
180 .with_label_values(&["tracking"])
181 .inc_by(
182 (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
183 + url_filter_cfg.tracking_params.len()) as u64,
184 );
185 m.map_filter_rules_loaded
186 .with_label_values(&["preserve"])
187 .inc_by(
188 (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
189 + url_filter_cfg.preserve_params.len()) as u64,
190 );
191 m.map_filter_rules_loaded
192 .with_label_values(&["host_override"])
193 .inc_by(url_filter_cfg.host_overrides.len() as u64);
194 let url_filter = Some(Arc::new(url_filter_cfg));
195
196 let state = Self {
197 config: Arc::new(config),
198 renderer: Arc::new(renderer),
199 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
200 extract_jobs: Arc::new(RwLock::new(HashMap::new())),
201 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
202 searxng,
203 url_filter,
204 };
205
206 let cleanup_state = state.clone();
209 tokio::spawn(async move {
210 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
211 loop {
212 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
213 let mut jobs = cleanup_state.crawl_jobs.write().await;
214 let before = jobs.len();
215 jobs.retain(|_id, job| {
216 let is_done = matches!(
217 job.rx.borrow().status,
218 CrawlStatus::Completed | CrawlStatus::Failed
219 );
220 !is_done || job.created_at.elapsed() < ttl
222 });
223 let removed = before - jobs.len();
224 if removed > 0 {
225 tracing::info!(
226 removed,
227 remaining = jobs.len(),
228 "Cleaned up expired crawl jobs"
229 );
230 }
231 drop(jobs);
232
233 let mut ejobs = cleanup_state.extract_jobs.write().await;
235 ejobs.retain(|_id, rec| {
236 matches!(rec.status, ExtractStatus::Processing)
237 || rec.created_at.elapsed() < ttl
238 });
239 }
240 });
241
242 Ok(state)
243 }
244
245 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
248 let id = Uuid::new_v4();
249 let initial = CrawlState {
250 id,
251 success: true,
252 status: CrawlStatus::InProgress,
253 total: 0,
254 completed: 0,
255 data: vec![],
256 error: None,
257 };
258
259 let (tx, rx) = watch::channel(initial);
260
261 {
262 let mut jobs = self.crawl_jobs.write().await;
263 jobs.insert(
264 id,
265 CrawlJob {
266 rx,
267 created_at: Instant::now(),
268 abort_handle: None,
269 },
270 );
271 }
272
273 let renderer = self.renderer.clone();
274 let max_concurrency = self.config.crawler.max_concurrency;
275 let respect_robots = self.config.crawler.respect_robots_txt;
276 let rps = self.config.crawler.requests_per_second;
277 let user_agent = self.config.crawler.user_agent.clone();
278 let crawl_semaphore = self.crawl_semaphore.clone();
279 let llm_config = self.config.extraction.llm.clone();
280 let proxy = self.config.crawler.proxy.clone();
281 let jitter_factor = self.config.crawler.stealth.jitter_factor;
282 let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
283 let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
284
285 let handle = tokio::spawn(async move {
286 let _permit = match crawl_semaphore.acquire().await {
287 Ok(p) => p,
288 Err(_) => {
289 let _ = tx.send(CrawlState {
290 id,
291 success: false,
292 status: CrawlStatus::Failed,
293 total: 0,
294 completed: 0,
295 data: vec![],
296 error: Some("Server is overloaded, try again later".into()),
297 });
298 return;
299 }
300 };
301 run_crawl(CrawlOptions {
302 id,
303 req,
304 renderer,
305 max_concurrency,
306 respect_robots,
307 requests_per_second: rps,
308 user_agent: &user_agent,
309 state_tx: tx,
310 llm_config: llm_config.as_ref(),
311 proxy,
312 jitter_factor,
313 deadline_ms_per_page,
314 per_host_max_concurrent,
315 })
316 .await;
317 });
318
319 {
321 let mut jobs = self.crawl_jobs.write().await;
322 if let Some(job) = jobs.get_mut(&id) {
323 job.abort_handle = Some(handle.abort_handle());
324 }
325 }
326
327 id
328 }
329
330 pub async fn start_batch_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
335 let id = Uuid::new_v4();
336 let total = urls.len() as u32;
337 let (tx, rx) = watch::channel(CrawlState {
338 id,
339 success: true,
340 status: CrawlStatus::InProgress,
341 total,
342 completed: 0,
343 data: vec![],
344 error: None,
345 });
346 {
347 let mut jobs = self.crawl_jobs.write().await;
348 jobs.insert(
349 id,
350 CrawlJob {
351 rx,
352 created_at: Instant::now(),
353 abort_handle: None,
354 },
355 );
356 }
357
358 let renderer = self.renderer.clone();
359 let crawl_semaphore = self.crawl_semaphore.clone();
360 let config = self.config.clone();
361 let max_concurrency = config.crawler.max_concurrency.max(1);
362
363 let handle = tokio::spawn(async move {
364 let _permit = match crawl_semaphore.acquire().await {
365 Ok(p) => p,
366 Err(_) => {
367 let _ = tx.send(CrawlState {
368 id,
369 success: false,
370 status: CrawlStatus::Failed,
371 total,
372 completed: 0,
373 data: vec![],
374 error: Some("Server is overloaded, try again later".into()),
375 });
376 return;
377 }
378 };
379
380 if total == 0 {
381 let _ = tx.send(CrawlState {
382 id,
383 success: true,
384 status: CrawlStatus::Completed,
385 total: 0,
386 completed: 0,
387 data: vec![],
388 error: None,
389 });
390 return;
391 }
392
393 let user_agent = config.crawler.user_agent.clone();
394 let default_stealth =
395 config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
396 let render_js_default = config.renderer.render_js_default;
397 let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
398
399 let reqs: Vec<ScrapeRequest> = urls
400 .into_iter()
401 .map(|u| {
402 let mut r = template.clone();
403 r.url = u;
404 r
405 })
406 .collect();
407
408 futures::stream::iter(reqs)
409 .for_each_concurrent(max_concurrency, |req| {
410 let renderer = renderer.clone();
411 let config = config.clone();
412 let user_agent = user_agent.clone();
413 let tx = tx.clone();
414 async move {
415 let deadline = Deadline::from_request_ms(deadline_ms);
416 let scraped = scrape_url(
417 &req,
418 &renderer,
419 config.extraction.llm.as_ref(),
420 &config.extraction,
421 &user_agent,
422 default_stealth,
423 render_js_default,
424 deadline,
425 )
426 .await
427 .ok();
428 tx.send_modify(|st| {
433 if let Some(d) = scraped {
434 st.data.push(d);
435 }
436 st.completed += 1;
437 if st.completed >= total {
438 st.status = CrawlStatus::Completed;
439 }
440 });
441 }
442 })
443 .await;
444 });
445
446 {
447 let mut jobs = self.crawl_jobs.write().await;
448 if let Some(job) = jobs.get_mut(&id) {
449 job.abort_handle = Some(handle.abort_handle());
450 }
451 }
452
453 id
454 }
455
456 pub async fn start_extract_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
460 let id = Uuid::new_v4();
461 {
462 let mut jobs = self.extract_jobs.write().await;
463 jobs.insert(
464 id,
465 ExtractRecord {
466 status: ExtractStatus::Processing,
467 data: None,
468 tokens_used: 0,
469 credits_used: 0,
470 error: None,
471 created_at: Instant::now(),
472 },
473 );
474 }
475
476 let renderer = self.renderer.clone();
477 let config = self.config.clone();
478 let extract_jobs = self.extract_jobs.clone();
479
480 tokio::spawn(async move {
481 let user_agent = config.crawler.user_agent.clone();
482 let default_stealth =
483 config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
484 let render_js_default = config.renderer.render_js_default;
485 let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
486
487 let mut merged = serde_json::Map::new();
488 let mut tokens = 0u32;
489 let mut credits = 0u32;
490 let mut last_err: Option<String> = None;
491 let mut any_ok = false;
492
493 for u in urls {
494 let mut req = template.clone();
495 req.url = u;
496 let deadline = Deadline::from_request_ms(deadline_ms);
497 match scrape_url(
498 &req,
499 &renderer,
500 config.extraction.llm.as_ref(),
501 &config.extraction,
502 &user_agent,
503 default_stealth,
504 render_js_default,
505 deadline,
506 )
507 .await
508 {
509 Ok(d) => {
510 any_ok = true;
511 if let Some(serde_json::Value::Object(obj)) = d.json {
512 for (k, v) in obj {
513 merged.insert(k, v);
514 }
515 }
516 if let Some(usage) = d.llm_usage {
517 tokens += usage.total_tokens;
518 }
519 credits += if d.credit_cost == 0 { 1 } else { d.credit_cost };
520 }
521 Err(e) => last_err = Some(e.to_string()),
522 }
523 }
524
525 let mut jobs = extract_jobs.write().await;
526 if let Some(rec) = jobs.get_mut(&id) {
527 if !any_ok && last_err.is_some() {
528 rec.status = ExtractStatus::Failed;
529 rec.error = last_err;
530 } else {
531 rec.status = ExtractStatus::Completed;
532 rec.data = Some(serde_json::Value::Object(merged));
533 }
534 rec.tokens_used = tokens;
535 rec.credits_used = credits.max(1);
536 }
537 });
538
539 id
540 }
541}