Skip to main content

crw_server/
state.rs

1use crw_core::Deadline;
2use crw_core::config::AppConfig;
3use crw_core::error::{CrwError, CrwResult};
4use crw_core::types::{
5    CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, ScrapeRequest,
6    resolve_pinned_renderer, resolve_render_js,
7};
8use crw_crawl::crawl::{CrawlOptions, run_crawl};
9use crw_crawl::single::scrape_url;
10use crw_renderer::FallbackRenderer;
11use crw_search::SearxngClient;
12use futures::stream::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use uuid::Uuid;
18
19/// Validate that a request's pinned renderer is available before accepting
20/// the job. Returns `InvalidRequest` (→ HTTP 400) when the named renderer is
21/// not in the configured pool. Skipped when `renderJs:false` is set, since
22/// HTTP-only ignores the pin.
23///
24/// We surface this explicitly (rather than silently falling back to "auto")
25/// so users get clear feedback when they ask for a renderer the operator
26/// hasn't configured. Sites that fail under one renderer often need a
27/// specific other one — silent fallback would leave callers wondering why
28/// "chrome" gave them the same broken result as "auto".
29pub(crate) fn validate_renderer_pin(
30    renderer: Option<RequestedRenderer>,
31    render_js: Option<bool>,
32    state: &AppState,
33) -> CrwResult<()> {
34    let Some(name) = resolve_pinned_renderer(renderer) else {
35        return Ok(());
36    };
37
38    // Mirror the fetch-path resolution at `crw-crawl/src/single.rs:41-50` so
39    // validation is consistent with what the actual request does. "Pinned
40    // implies JS" — when a renderer is pinned and the request omits
41    // `renderJs`, force the request to JS=true so a `render_js_default=false`
42    // server config doesn't silently send the request through HTTP-only.
43    let effective_request = if render_js.is_none() {
44        Some(true)
45    } else {
46        render_js
47    };
48    let effective_render_js =
49        resolve_render_js(effective_request, state.config.renderer.render_js_default);
50
51    if effective_render_js == Some(false) {
52        return Ok(());
53    }
54
55    let available = state.renderer.js_renderer_names();
56    if !available.contains(&name) {
57        return Err(CrwError::InvalidRequest(format!(
58            "renderer '{}' not available; configured renderers: [{}]. \
59             Update server config or omit the 'renderer' field.",
60            name,
61            available.join(", ")
62        )));
63    }
64    Ok(())
65}
66
67/// Crawl-specific wrapper around [`validate_renderer_pin`].
68pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
69    validate_renderer_pin(req.renderer, req.render_js, state)
70}
71
72/// Tracks a crawl job receiver + creation time for TTL cleanup.
73pub struct CrawlJob {
74    pub rx: watch::Receiver<CrawlState>,
75    pub created_at: Instant,
76    /// Handle to abort the crawl task.
77    pub abort_handle: Option<tokio::task::AbortHandle>,
78}
79
80/// Maximum number of concurrent crawl jobs.
81const MAX_CONCURRENT_CRAWLS: usize = 10;
82/// Interval between expired crawl job cleanup runs.
83const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
84
85/// Lifecycle of an async `/v2/extract` job.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ExtractStatus {
88    Processing,
89    Completed,
90    Failed,
91}
92
93impl ExtractStatus {
94    pub fn as_str(self) -> &'static str {
95        match self {
96            ExtractStatus::Processing => "processing",
97            ExtractStatus::Completed => "completed",
98            ExtractStatus::Failed => "failed",
99        }
100    }
101}
102
103/// A `/v2/extract` job record. `data` is the single merged JSON object (the
104/// scrape's `json` field unioned across URLs), matching the live API's
105/// `GET /v2/extract/{id}` `data` shape (an object, not an array of documents).
106#[derive(Debug, Clone)]
107pub struct ExtractRecord {
108    pub status: ExtractStatus,
109    pub data: Option<serde_json::Value>,
110    pub tokens_used: u32,
111    pub credits_used: u32,
112    pub error: Option<String>,
113    pub created_at: Instant,
114}
115
116/// Shared application state.
117#[derive(Clone)]
118pub struct AppState {
119    pub config: Arc<AppConfig>,
120    pub renderer: Arc<FallbackRenderer>,
121    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
122    /// `/v2/extract` jobs. Separate from `crawl_jobs` because an extract result
123    /// is a single merged JSON object, not a `Vec<ScrapeData>`.
124    pub extract_jobs: Arc<RwLock<HashMap<Uuid, ExtractRecord>>>,
125    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
126    /// SearXNG client. `None` when `[search].searxng_url` is unset, in which
127    /// case `/v1/search` returns a clear `search_disabled` error.
128    pub searxng: Option<Arc<SearxngClient>>,
129    /// Server-wide default /map URL filter. `None` disables the filter
130    /// entirely (legacy behaviour). Per-request overrides may swap or
131    /// extend this at handler time.
132    pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
133}
134
135impl AppState {
136    pub fn new(config: AppConfig) -> CrwResult<Self> {
137        // Build the proxy rotator from config (list takes precedence over the
138        // single `proxy`). When present, it owns ALL proxy routing (HTTP pool +
139        // per-request CDP proxyServer), so `new()` gets `proxy = None` and the
140        // rotator is attached via `with_proxy_rotator`. An invalid proxy URL is
141        // a hard startup error — never a silent direct-connection fallback.
142        let proxy_rotator = crw_core::ProxyRotator::build(
143            &config.crawler.proxy_list,
144            config.crawler.proxy.as_deref(),
145            config.crawler.proxy_rotation,
146        )
147        .map_err(CrwError::ConfigError)?
148        .map(Arc::new);
149        let renderer = FallbackRenderer::new(
150            &config.renderer,
151            &config.crawler.user_agent,
152            None,
153            &config.crawler.stealth,
154        )?
155        .with_proxy_rotator(proxy_rotator)?
156        .with_host_limits(
157            config.crawler.requests_per_second,
158            config.crawler.per_host_max_concurrent,
159        );
160
161        let searxng = if config.search.enabled
162            && let Some(url) = config.search.searxng_url.as_ref()
163        {
164            // Dedicated reqwest client for SearXNG so its connection pool is
165            // hot and isolated from the renderer / scrape paths. SearXNG runs
166            // on the same docker network in the bundled compose so a 5s
167            // connect_timeout is generous.
168            let http = reqwest::Client::builder()
169                .connect_timeout(Duration::from_secs(5))
170                .build()
171                .map_err(|e| {
172                    CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
173                })?;
174            let timeout = Duration::from_millis(config.search.timeout_ms);
175            Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
176        } else {
177            None
178        };
179
180        let url_filter_cfg =
181            crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
182        // One-shot snapshot of how many rules the filter knows about. Helps
183        // operators confirm at boot that the deny-lists actually loaded.
184        let m = crw_core::metrics::metrics();
185        m.map_filter_rules_loaded
186            .with_label_values(&["action"])
187            .inc_by(
188                (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
189                    + url_filter_cfg.action_params.len()) as u64,
190            );
191        m.map_filter_rules_loaded
192            .with_label_values(&["tracking"])
193            .inc_by(
194                (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
195                    + url_filter_cfg.tracking_params.len()) as u64,
196            );
197        m.map_filter_rules_loaded
198            .with_label_values(&["preserve"])
199            .inc_by(
200                (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
201                    + url_filter_cfg.preserve_params.len()) as u64,
202            );
203        m.map_filter_rules_loaded
204            .with_label_values(&["host_override"])
205            .inc_by(url_filter_cfg.host_overrides.len() as u64);
206        let url_filter = Some(Arc::new(url_filter_cfg));
207
208        let state = Self {
209            config: Arc::new(config),
210            renderer: Arc::new(renderer),
211            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
212            extract_jobs: Arc::new(RwLock::new(HashMap::new())),
213            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
214            searxng,
215            url_filter,
216        };
217
218        // Wrap the not-yet-returned state in a block to keep the Ok() shape at the end.
219        // Spawn background job cleanup task.
220        let cleanup_state = state.clone();
221        tokio::spawn(async move {
222            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
223            loop {
224                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
225                let mut jobs = cleanup_state.crawl_jobs.write().await;
226                let before = jobs.len();
227                jobs.retain(|_id, job| {
228                    let is_done = matches!(
229                        job.rx.borrow().status,
230                        CrawlStatus::Completed | CrawlStatus::Failed
231                    );
232                    // Keep if not done, or if done but within TTL.
233                    !is_done || job.created_at.elapsed() < ttl
234                });
235                let removed = before - jobs.len();
236                if removed > 0 {
237                    tracing::info!(
238                        removed,
239                        remaining = jobs.len(),
240                        "Cleaned up expired crawl jobs"
241                    );
242                }
243                drop(jobs);
244
245                // Prune finished extract jobs past TTL (keep in-flight ones).
246                let mut ejobs = cleanup_state.extract_jobs.write().await;
247                ejobs.retain(|_id, rec| {
248                    matches!(rec.status, ExtractStatus::Processing)
249                        || rec.created_at.elapsed() < ttl
250                });
251            }
252        });
253
254        Ok(state)
255    }
256
257    /// Start a new crawl job and return its UUID.
258    /// Spawns a background task that acquires the crawl semaphore before running.
259    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
260        let id = Uuid::new_v4();
261        let initial = CrawlState {
262            id,
263            success: true,
264            status: CrawlStatus::InProgress,
265            total: 0,
266            completed: 0,
267            data: vec![],
268            error: None,
269        };
270
271        let (tx, rx) = watch::channel(initial);
272
273        {
274            let mut jobs = self.crawl_jobs.write().await;
275            jobs.insert(
276                id,
277                CrawlJob {
278                    rx,
279                    created_at: Instant::now(),
280                    abort_handle: None,
281                },
282            );
283        }
284
285        let renderer = self.renderer.clone();
286        let max_concurrency = self.config.crawler.max_concurrency;
287        let respect_robots = self.config.crawler.respect_robots_txt;
288        let rps = self.config.crawler.requests_per_second;
289        let user_agent = self.config.crawler.user_agent.clone();
290        let crawl_semaphore = self.crawl_semaphore.clone();
291        let llm_config = self.config.extraction.llm.clone();
292        let proxy = self.config.crawler.proxy.clone();
293        let jitter_factor = self.config.crawler.stealth.jitter_factor;
294        let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
295        let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
296
297        let handle = tokio::spawn(async move {
298            let _permit = match crawl_semaphore.acquire().await {
299                Ok(p) => p,
300                Err(_) => {
301                    let _ = tx.send(CrawlState {
302                        id,
303                        success: false,
304                        status: CrawlStatus::Failed,
305                        total: 0,
306                        completed: 0,
307                        data: vec![],
308                        error: Some("Server is overloaded, try again later".into()),
309                    });
310                    return;
311                }
312            };
313            run_crawl(CrawlOptions {
314                id,
315                req,
316                renderer,
317                max_concurrency,
318                respect_robots,
319                requests_per_second: rps,
320                user_agent: &user_agent,
321                state_tx: tx,
322                llm_config: llm_config.as_ref(),
323                proxy,
324                jitter_factor,
325                deadline_ms_per_page,
326                per_host_max_concurrent,
327            })
328            .await;
329        });
330
331        // Store the abort handle so the job can be cancelled via DELETE.
332        {
333            let mut jobs = self.crawl_jobs.write().await;
334            if let Some(job) = jobs.get_mut(&id) {
335                job.abort_handle = Some(handle.abort_handle());
336            }
337        }
338
339        id
340    }
341
342    /// Start a `/v2/batch/scrape` job over an explicit URL list and return its
343    /// UUID. Reuses the crawl-job machinery (`crawl_jobs` + `CrawlState`) but
344    /// scrapes the given URLs directly — no link discovery, no same-origin
345    /// filtering, no dedup; input order is recoverable via `metadata.sourceURL`.
346    pub async fn start_batch_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
347        let id = Uuid::new_v4();
348        let total = urls.len() as u32;
349        let (tx, rx) = watch::channel(CrawlState {
350            id,
351            success: true,
352            status: CrawlStatus::InProgress,
353            total,
354            completed: 0,
355            data: vec![],
356            error: None,
357        });
358        {
359            let mut jobs = self.crawl_jobs.write().await;
360            jobs.insert(
361                id,
362                CrawlJob {
363                    rx,
364                    created_at: Instant::now(),
365                    abort_handle: None,
366                },
367            );
368        }
369
370        let renderer = self.renderer.clone();
371        let crawl_semaphore = self.crawl_semaphore.clone();
372        let config = self.config.clone();
373        let max_concurrency = config.crawler.max_concurrency.max(1);
374
375        let handle = tokio::spawn(async move {
376            let _permit = match crawl_semaphore.acquire().await {
377                Ok(p) => p,
378                Err(_) => {
379                    let _ = tx.send(CrawlState {
380                        id,
381                        success: false,
382                        status: CrawlStatus::Failed,
383                        total,
384                        completed: 0,
385                        data: vec![],
386                        error: Some("Server is overloaded, try again later".into()),
387                    });
388                    return;
389                }
390            };
391
392            if total == 0 {
393                let _ = tx.send(CrawlState {
394                    id,
395                    success: true,
396                    status: CrawlStatus::Completed,
397                    total: 0,
398                    completed: 0,
399                    data: vec![],
400                    error: None,
401                });
402                return;
403            }
404
405            let user_agent = config.crawler.user_agent.clone();
406            let default_stealth =
407                config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
408            let render_js_default = config.renderer.render_js_default;
409            let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
410
411            let reqs: Vec<ScrapeRequest> = urls
412                .into_iter()
413                .map(|u| {
414                    let mut r = template.clone();
415                    r.url = u;
416                    r
417                })
418                .collect();
419
420            futures::stream::iter(reqs)
421                .for_each_concurrent(max_concurrency, |req| {
422                    let renderer = renderer.clone();
423                    let config = config.clone();
424                    let user_agent = user_agent.clone();
425                    let tx = tx.clone();
426                    async move {
427                        let deadline = Deadline::from_request_ms(deadline_ms);
428                        let scraped = scrape_url(
429                            &req,
430                            &renderer,
431                            config.extraction.llm.as_ref(),
432                            &config.extraction,
433                            &user_agent,
434                            default_stealth,
435                            render_js_default,
436                            deadline,
437                        )
438                        .await
439                        .ok();
440                        // Mutate the shared status in place — push one document and
441                        // bump the counter without cloning the whole accumulated Vec
442                        // on every completion (avoids O(n^2) copying on large
443                        // batches). A failed scrape still advances `completed`.
444                        tx.send_modify(|st| {
445                            if let Some(d) = scraped {
446                                st.data.push(d);
447                            }
448                            st.completed += 1;
449                            if st.completed >= total {
450                                st.status = CrawlStatus::Completed;
451                            }
452                        });
453                    }
454                })
455                .await;
456        });
457
458        {
459            let mut jobs = self.crawl_jobs.write().await;
460            if let Some(job) = jobs.get_mut(&id) {
461                job.abort_handle = Some(handle.abort_handle());
462            }
463        }
464
465        id
466    }
467
468    /// Start a `/v2/extract` job. Scrapes each URL with `formats:[json]` + the
469    /// shared schema (already set on `template`) and merges the per-URL `json`
470    /// objects into one — matching the live API's single-object `data` shape.
471    pub async fn start_extract_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
472        let id = Uuid::new_v4();
473        {
474            let mut jobs = self.extract_jobs.write().await;
475            jobs.insert(
476                id,
477                ExtractRecord {
478                    status: ExtractStatus::Processing,
479                    data: None,
480                    tokens_used: 0,
481                    credits_used: 0,
482                    error: None,
483                    created_at: Instant::now(),
484                },
485            );
486        }
487
488        let renderer = self.renderer.clone();
489        let config = self.config.clone();
490        let extract_jobs = self.extract_jobs.clone();
491
492        tokio::spawn(async move {
493            let user_agent = config.crawler.user_agent.clone();
494            let default_stealth =
495                config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
496            let render_js_default = config.renderer.render_js_default;
497            let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
498
499            let mut merged = serde_json::Map::new();
500            let mut tokens = 0u32;
501            let mut credits = 0u32;
502            let mut last_err: Option<String> = None;
503            let mut any_ok = false;
504
505            for u in urls {
506                let mut req = template.clone();
507                req.url = u;
508                let deadline = Deadline::from_request_ms(deadline_ms);
509                match scrape_url(
510                    &req,
511                    &renderer,
512                    config.extraction.llm.as_ref(),
513                    &config.extraction,
514                    &user_agent,
515                    default_stealth,
516                    render_js_default,
517                    deadline,
518                )
519                .await
520                {
521                    Ok(d) => {
522                        any_ok = true;
523                        if let Some(serde_json::Value::Object(obj)) = d.json {
524                            for (k, v) in obj {
525                                merged.insert(k, v);
526                            }
527                        }
528                        if let Some(usage) = d.llm_usage {
529                            tokens += usage.total_tokens;
530                        }
531                        credits += if d.credit_cost == 0 { 1 } else { d.credit_cost };
532                    }
533                    Err(e) => last_err = Some(e.to_string()),
534                }
535            }
536
537            let mut jobs = extract_jobs.write().await;
538            if let Some(rec) = jobs.get_mut(&id) {
539                if !any_ok && last_err.is_some() {
540                    rec.status = ExtractStatus::Failed;
541                    rec.error = last_err;
542                } else {
543                    rec.status = ExtractStatus::Completed;
544                    rec.data = Some(serde_json::Value::Object(merged));
545                }
546                rec.tokens_used = tokens;
547                rec.credits_used = credits.max(1);
548            }
549        });
550
551        id
552    }
553}