Skip to main content

crw_server/
state.rs

1use crw_core::Deadline;
2use crw_core::config::AppConfig;
3use crw_core::error::{CrwError, CrwResult};
4use crw_core::types::{
5    CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, ScrapeRequest,
6    resolve_pinned_renderer, resolve_render_js,
7};
8use crw_crawl::crawl::{CrawlOptions, run_crawl};
9use crw_crawl::single::scrape_url;
10use crw_renderer::FallbackRenderer;
11use crw_search::SearxngClient;
12use futures::stream::StreamExt;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::{RwLock, watch};
17use uuid::Uuid;
18
19/// Validate that a request's pinned renderer is available before accepting
20/// the job. Returns `InvalidRequest` (→ HTTP 400) when the named renderer is
21/// not in the configured pool. Skipped when `renderJs:false` is set, since
22/// HTTP-only ignores the pin.
23///
24/// We surface this explicitly (rather than silently falling back to "auto")
25/// so users get clear feedback when they ask for a renderer the operator
26/// hasn't configured. Sites that fail under one renderer often need a
27/// specific other one — silent fallback would leave callers wondering why
28/// "chrome" gave them the same broken result as "auto".
29pub(crate) fn validate_renderer_pin(
30    renderer: Option<RequestedRenderer>,
31    render_js: Option<bool>,
32    state: &AppState,
33) -> CrwResult<()> {
34    let Some(name) = resolve_pinned_renderer(renderer) else {
35        return Ok(());
36    };
37
38    // Mirror the fetch-path resolution at `crw-crawl/src/single.rs:41-50` so
39    // validation is consistent with what the actual request does. "Pinned
40    // implies JS" — when a renderer is pinned and the request omits
41    // `renderJs`, force the request to JS=true so a `render_js_default=false`
42    // server config doesn't silently send the request through HTTP-only.
43    let effective_request = if render_js.is_none() {
44        Some(true)
45    } else {
46        render_js
47    };
48    let effective_render_js =
49        resolve_render_js(effective_request, state.config.renderer.render_js_default);
50
51    if effective_render_js == Some(false) {
52        return Ok(());
53    }
54
55    let available = state.renderer.js_renderer_names();
56    if !available.contains(&name) {
57        return Err(CrwError::InvalidRequest(format!(
58            "renderer '{}' not available; configured renderers: [{}]. \
59             Update server config or omit the 'renderer' field.",
60            name,
61            available.join(", ")
62        )));
63    }
64    Ok(())
65}
66
67/// Crawl-specific wrapper around [`validate_renderer_pin`].
68pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
69    validate_renderer_pin(req.renderer, req.render_js, state)
70}
71
72/// Tracks a crawl job receiver + creation time for TTL cleanup.
73pub struct CrawlJob {
74    pub rx: watch::Receiver<CrawlState>,
75    pub created_at: Instant,
76    /// Handle to abort the crawl task.
77    pub abort_handle: Option<tokio::task::AbortHandle>,
78}
79
80/// Maximum number of concurrent crawl jobs.
81const MAX_CONCURRENT_CRAWLS: usize = 10;
82/// Interval between expired crawl job cleanup runs.
83const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
84
85/// Lifecycle of an async `/v2/extract` job.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum ExtractStatus {
88    Processing,
89    Completed,
90    Failed,
91}
92
93impl ExtractStatus {
94    pub fn as_str(self) -> &'static str {
95        match self {
96            ExtractStatus::Processing => "processing",
97            ExtractStatus::Completed => "completed",
98            ExtractStatus::Failed => "failed",
99        }
100    }
101}
102
103/// A `/v2/extract` job record. `data` is the single merged JSON object (the
104/// scrape's `json` field unioned across URLs), matching the live API's
105/// `GET /v2/extract/{id}` `data` shape (an object, not an array of documents).
106#[derive(Debug, Clone)]
107pub struct ExtractRecord {
108    pub status: ExtractStatus,
109    pub data: Option<serde_json::Value>,
110    pub tokens_used: u32,
111    pub credits_used: u32,
112    pub error: Option<String>,
113    pub created_at: Instant,
114}
115
116/// Shared application state.
117#[derive(Clone)]
118pub struct AppState {
119    pub config: Arc<AppConfig>,
120    pub renderer: Arc<FallbackRenderer>,
121    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
122    /// `/v2/extract` jobs. Separate from `crawl_jobs` because an extract result
123    /// is a single merged JSON object, not a `Vec<ScrapeData>`.
124    pub extract_jobs: Arc<RwLock<HashMap<Uuid, ExtractRecord>>>,
125    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
126    /// SearXNG client. `None` when `[search].searxng_url` is unset, in which
127    /// case `/v1/search` returns a clear `search_disabled` error.
128    pub searxng: Option<Arc<SearxngClient>>,
129    /// Server-wide default /map URL filter. `None` disables the filter
130    /// entirely (legacy behaviour). Per-request overrides may swap or
131    /// extend this at handler time.
132    pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
133}
134
135impl AppState {
136    pub fn new(config: AppConfig) -> CrwResult<Self> {
137        let proxy = config.crawler.proxy.as_deref();
138        let renderer = FallbackRenderer::new(
139            &config.renderer,
140            &config.crawler.user_agent,
141            proxy,
142            &config.crawler.stealth,
143        )?
144        .with_host_limits(
145            config.crawler.requests_per_second,
146            config.crawler.per_host_max_concurrent,
147        );
148
149        let searxng = if config.search.enabled
150            && let Some(url) = config.search.searxng_url.as_ref()
151        {
152            // Dedicated reqwest client for SearXNG so its connection pool is
153            // hot and isolated from the renderer / scrape paths. SearXNG runs
154            // on the same docker network in the bundled compose so a 5s
155            // connect_timeout is generous.
156            let http = reqwest::Client::builder()
157                .connect_timeout(Duration::from_secs(5))
158                .build()
159                .map_err(|e| {
160                    CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
161                })?;
162            let timeout = Duration::from_millis(config.search.timeout_ms);
163            Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
164        } else {
165            None
166        };
167
168        let url_filter_cfg =
169            crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
170        // One-shot snapshot of how many rules the filter knows about. Helps
171        // operators confirm at boot that the deny-lists actually loaded.
172        let m = crw_core::metrics::metrics();
173        m.map_filter_rules_loaded
174            .with_label_values(&["action"])
175            .inc_by(
176                (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
177                    + url_filter_cfg.action_params.len()) as u64,
178            );
179        m.map_filter_rules_loaded
180            .with_label_values(&["tracking"])
181            .inc_by(
182                (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
183                    + url_filter_cfg.tracking_params.len()) as u64,
184            );
185        m.map_filter_rules_loaded
186            .with_label_values(&["preserve"])
187            .inc_by(
188                (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
189                    + url_filter_cfg.preserve_params.len()) as u64,
190            );
191        m.map_filter_rules_loaded
192            .with_label_values(&["host_override"])
193            .inc_by(url_filter_cfg.host_overrides.len() as u64);
194        let url_filter = Some(Arc::new(url_filter_cfg));
195
196        let state = Self {
197            config: Arc::new(config),
198            renderer: Arc::new(renderer),
199            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
200            extract_jobs: Arc::new(RwLock::new(HashMap::new())),
201            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
202            searxng,
203            url_filter,
204        };
205
206        // Wrap the not-yet-returned state in a block to keep the Ok() shape at the end.
207        // Spawn background job cleanup task.
208        let cleanup_state = state.clone();
209        tokio::spawn(async move {
210            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
211            loop {
212                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
213                let mut jobs = cleanup_state.crawl_jobs.write().await;
214                let before = jobs.len();
215                jobs.retain(|_id, job| {
216                    let is_done = matches!(
217                        job.rx.borrow().status,
218                        CrawlStatus::Completed | CrawlStatus::Failed
219                    );
220                    // Keep if not done, or if done but within TTL.
221                    !is_done || job.created_at.elapsed() < ttl
222                });
223                let removed = before - jobs.len();
224                if removed > 0 {
225                    tracing::info!(
226                        removed,
227                        remaining = jobs.len(),
228                        "Cleaned up expired crawl jobs"
229                    );
230                }
231                drop(jobs);
232
233                // Prune finished extract jobs past TTL (keep in-flight ones).
234                let mut ejobs = cleanup_state.extract_jobs.write().await;
235                ejobs.retain(|_id, rec| {
236                    matches!(rec.status, ExtractStatus::Processing)
237                        || rec.created_at.elapsed() < ttl
238                });
239            }
240        });
241
242        Ok(state)
243    }
244
245    /// Start a new crawl job and return its UUID.
246    /// Spawns a background task that acquires the crawl semaphore before running.
247    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
248        let id = Uuid::new_v4();
249        let initial = CrawlState {
250            id,
251            success: true,
252            status: CrawlStatus::InProgress,
253            total: 0,
254            completed: 0,
255            data: vec![],
256            error: None,
257        };
258
259        let (tx, rx) = watch::channel(initial);
260
261        {
262            let mut jobs = self.crawl_jobs.write().await;
263            jobs.insert(
264                id,
265                CrawlJob {
266                    rx,
267                    created_at: Instant::now(),
268                    abort_handle: None,
269                },
270            );
271        }
272
273        let renderer = self.renderer.clone();
274        let max_concurrency = self.config.crawler.max_concurrency;
275        let respect_robots = self.config.crawler.respect_robots_txt;
276        let rps = self.config.crawler.requests_per_second;
277        let user_agent = self.config.crawler.user_agent.clone();
278        let crawl_semaphore = self.crawl_semaphore.clone();
279        let llm_config = self.config.extraction.llm.clone();
280        let proxy = self.config.crawler.proxy.clone();
281        let jitter_factor = self.config.crawler.stealth.jitter_factor;
282        let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
283        let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
284
285        let handle = tokio::spawn(async move {
286            let _permit = match crawl_semaphore.acquire().await {
287                Ok(p) => p,
288                Err(_) => {
289                    let _ = tx.send(CrawlState {
290                        id,
291                        success: false,
292                        status: CrawlStatus::Failed,
293                        total: 0,
294                        completed: 0,
295                        data: vec![],
296                        error: Some("Server is overloaded, try again later".into()),
297                    });
298                    return;
299                }
300            };
301            run_crawl(CrawlOptions {
302                id,
303                req,
304                renderer,
305                max_concurrency,
306                respect_robots,
307                requests_per_second: rps,
308                user_agent: &user_agent,
309                state_tx: tx,
310                llm_config: llm_config.as_ref(),
311                proxy,
312                jitter_factor,
313                deadline_ms_per_page,
314                per_host_max_concurrent,
315            })
316            .await;
317        });
318
319        // Store the abort handle so the job can be cancelled via DELETE.
320        {
321            let mut jobs = self.crawl_jobs.write().await;
322            if let Some(job) = jobs.get_mut(&id) {
323                job.abort_handle = Some(handle.abort_handle());
324            }
325        }
326
327        id
328    }
329
330    /// Start a `/v2/batch/scrape` job over an explicit URL list and return its
331    /// UUID. Reuses the crawl-job machinery (`crawl_jobs` + `CrawlState`) but
332    /// scrapes the given URLs directly — no link discovery, no same-origin
333    /// filtering, no dedup; input order is recoverable via `metadata.sourceURL`.
334    pub async fn start_batch_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
335        let id = Uuid::new_v4();
336        let total = urls.len() as u32;
337        let (tx, rx) = watch::channel(CrawlState {
338            id,
339            success: true,
340            status: CrawlStatus::InProgress,
341            total,
342            completed: 0,
343            data: vec![],
344            error: None,
345        });
346        {
347            let mut jobs = self.crawl_jobs.write().await;
348            jobs.insert(
349                id,
350                CrawlJob {
351                    rx,
352                    created_at: Instant::now(),
353                    abort_handle: None,
354                },
355            );
356        }
357
358        let renderer = self.renderer.clone();
359        let crawl_semaphore = self.crawl_semaphore.clone();
360        let config = self.config.clone();
361        let max_concurrency = config.crawler.max_concurrency.max(1);
362
363        let handle = tokio::spawn(async move {
364            let _permit = match crawl_semaphore.acquire().await {
365                Ok(p) => p,
366                Err(_) => {
367                    let _ = tx.send(CrawlState {
368                        id,
369                        success: false,
370                        status: CrawlStatus::Failed,
371                        total,
372                        completed: 0,
373                        data: vec![],
374                        error: Some("Server is overloaded, try again later".into()),
375                    });
376                    return;
377                }
378            };
379
380            if total == 0 {
381                let _ = tx.send(CrawlState {
382                    id,
383                    success: true,
384                    status: CrawlStatus::Completed,
385                    total: 0,
386                    completed: 0,
387                    data: vec![],
388                    error: None,
389                });
390                return;
391            }
392
393            let user_agent = config.crawler.user_agent.clone();
394            let default_stealth =
395                config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
396            let render_js_default = config.renderer.render_js_default;
397            let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
398
399            let reqs: Vec<ScrapeRequest> = urls
400                .into_iter()
401                .map(|u| {
402                    let mut r = template.clone();
403                    r.url = u;
404                    r
405                })
406                .collect();
407
408            futures::stream::iter(reqs)
409                .for_each_concurrent(max_concurrency, |req| {
410                    let renderer = renderer.clone();
411                    let config = config.clone();
412                    let user_agent = user_agent.clone();
413                    let tx = tx.clone();
414                    async move {
415                        let deadline = Deadline::from_request_ms(deadline_ms);
416                        let scraped = scrape_url(
417                            &req,
418                            &renderer,
419                            config.extraction.llm.as_ref(),
420                            &config.extraction,
421                            &user_agent,
422                            default_stealth,
423                            render_js_default,
424                            deadline,
425                        )
426                        .await
427                        .ok();
428                        // Mutate the shared status in place — push one document and
429                        // bump the counter without cloning the whole accumulated Vec
430                        // on every completion (avoids O(n^2) copying on large
431                        // batches). A failed scrape still advances `completed`.
432                        tx.send_modify(|st| {
433                            if let Some(d) = scraped {
434                                st.data.push(d);
435                            }
436                            st.completed += 1;
437                            if st.completed >= total {
438                                st.status = CrawlStatus::Completed;
439                            }
440                        });
441                    }
442                })
443                .await;
444        });
445
446        {
447            let mut jobs = self.crawl_jobs.write().await;
448            if let Some(job) = jobs.get_mut(&id) {
449                job.abort_handle = Some(handle.abort_handle());
450            }
451        }
452
453        id
454    }
455
456    /// Start a `/v2/extract` job. Scrapes each URL with `formats:[json]` + the
457    /// shared schema (already set on `template`) and merges the per-URL `json`
458    /// objects into one — matching the live API's single-object `data` shape.
459    pub async fn start_extract_job(&self, urls: Vec<String>, template: ScrapeRequest) -> Uuid {
460        let id = Uuid::new_v4();
461        {
462            let mut jobs = self.extract_jobs.write().await;
463            jobs.insert(
464                id,
465                ExtractRecord {
466                    status: ExtractStatus::Processing,
467                    data: None,
468                    tokens_used: 0,
469                    credits_used: 0,
470                    error: None,
471                    created_at: Instant::now(),
472                },
473            );
474        }
475
476        let renderer = self.renderer.clone();
477        let config = self.config.clone();
478        let extract_jobs = self.extract_jobs.clone();
479
480        tokio::spawn(async move {
481            let user_agent = config.crawler.user_agent.clone();
482            let default_stealth =
483                config.crawler.stealth.enabled && config.crawler.stealth.inject_headers;
484            let render_js_default = config.renderer.render_js_default;
485            let deadline_ms = config.effective_deadline_ms(template.deadline_ms, template.wait_for);
486
487            let mut merged = serde_json::Map::new();
488            let mut tokens = 0u32;
489            let mut credits = 0u32;
490            let mut last_err: Option<String> = None;
491            let mut any_ok = false;
492
493            for u in urls {
494                let mut req = template.clone();
495                req.url = u;
496                let deadline = Deadline::from_request_ms(deadline_ms);
497                match scrape_url(
498                    &req,
499                    &renderer,
500                    config.extraction.llm.as_ref(),
501                    &config.extraction,
502                    &user_agent,
503                    default_stealth,
504                    render_js_default,
505                    deadline,
506                )
507                .await
508                {
509                    Ok(d) => {
510                        any_ok = true;
511                        if let Some(serde_json::Value::Object(obj)) = d.json {
512                            for (k, v) in obj {
513                                merged.insert(k, v);
514                            }
515                        }
516                        if let Some(usage) = d.llm_usage {
517                            tokens += usage.total_tokens;
518                        }
519                        credits += if d.credit_cost == 0 { 1 } else { d.credit_cost };
520                    }
521                    Err(e) => last_err = Some(e.to_string()),
522                }
523            }
524
525            let mut jobs = extract_jobs.write().await;
526            if let Some(rec) = jobs.get_mut(&id) {
527                if !any_ok && last_err.is_some() {
528                    rec.status = ExtractStatus::Failed;
529                    rec.error = last_err;
530                } else {
531                    rec.status = ExtractStatus::Completed;
532                    rec.data = Some(serde_json::Value::Object(merged));
533                }
534                rec.tokens_used = tokens;
535                rec.credits_used = credits.max(1);
536            }
537        });
538
539        id
540    }
541}