Skip to main content

crw_server/
state.rs

1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4    CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5    resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use crw_search::SearxngClient;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{RwLock, watch};
14use uuid::Uuid;
15
16/// Validate that a request's pinned renderer is available before accepting
17/// the job. Returns `InvalidRequest` (→ HTTP 400) when the named renderer is
18/// not in the configured pool. Skipped when `renderJs:false` is set, since
19/// HTTP-only ignores the pin.
20///
21/// We surface this explicitly (rather than silently falling back to "auto")
22/// so users get clear feedback when they ask for a renderer the operator
23/// hasn't configured. Sites that fail under one renderer often need a
24/// specific other one — silent fallback would leave callers wondering why
25/// "chrome" gave them the same broken result as "auto".
26pub(crate) fn validate_renderer_pin(
27    renderer: Option<RequestedRenderer>,
28    render_js: Option<bool>,
29    state: &AppState,
30) -> CrwResult<()> {
31    let Some(name) = resolve_pinned_renderer(renderer) else {
32        return Ok(());
33    };
34
35    // Mirror the fetch-path resolution at `crw-crawl/src/single.rs:41-50` so
36    // validation is consistent with what the actual request does. "Pinned
37    // implies JS" — when a renderer is pinned and the request omits
38    // `renderJs`, force the request to JS=true so a `render_js_default=false`
39    // server config doesn't silently send the request through HTTP-only.
40    let effective_request = if render_js.is_none() {
41        Some(true)
42    } else {
43        render_js
44    };
45    let effective_render_js =
46        resolve_render_js(effective_request, state.config.renderer.render_js_default);
47
48    if effective_render_js == Some(false) {
49        return Ok(());
50    }
51
52    let available = state.renderer.js_renderer_names();
53    if !available.contains(&name) {
54        return Err(CrwError::InvalidRequest(format!(
55            "renderer '{}' not available; configured renderers: [{}]. \
56             Update server config or omit the 'renderer' field.",
57            name,
58            available.join(", ")
59        )));
60    }
61    Ok(())
62}
63
64/// Crawl-specific wrapper around [`validate_renderer_pin`].
65pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
66    validate_renderer_pin(req.renderer, req.render_js, state)
67}
68
69/// Tracks a crawl job receiver + creation time for TTL cleanup.
70pub struct CrawlJob {
71    pub rx: watch::Receiver<CrawlState>,
72    pub created_at: Instant,
73    /// Handle to abort the crawl task.
74    pub abort_handle: Option<tokio::task::AbortHandle>,
75}
76
77/// Maximum number of concurrent crawl jobs.
78const MAX_CONCURRENT_CRAWLS: usize = 10;
79/// Interval between expired crawl job cleanup runs.
80const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
81
82/// Shared application state.
83#[derive(Clone)]
84pub struct AppState {
85    pub config: Arc<AppConfig>,
86    pub renderer: Arc<FallbackRenderer>,
87    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
88    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
89    /// SearXNG client. `None` when `[search].searxng_url` is unset, in which
90    /// case `/v1/search` returns a clear `search_disabled` error.
91    pub searxng: Option<Arc<SearxngClient>>,
92    /// Server-wide default /map URL filter. `None` disables the filter
93    /// entirely (legacy behaviour). Per-request overrides may swap or
94    /// extend this at handler time.
95    pub url_filter: Option<Arc<crw_crawl::url_filter::UrlFilterCfg>>,
96}
97
98impl AppState {
99    pub fn new(config: AppConfig) -> CrwResult<Self> {
100        let proxy = config.crawler.proxy.as_deref();
101        let renderer = FallbackRenderer::new(
102            &config.renderer,
103            &config.crawler.user_agent,
104            proxy,
105            &config.crawler.stealth,
106        )?
107        .with_host_limits(
108            config.crawler.requests_per_second,
109            config.crawler.per_host_max_concurrent,
110        );
111
112        let searxng = if config.search.enabled
113            && let Some(url) = config.search.searxng_url.as_ref()
114        {
115            // Dedicated reqwest client for SearXNG so its connection pool is
116            // hot and isolated from the renderer / scrape paths. SearXNG runs
117            // on the same docker network in the bundled compose so a 5s
118            // connect_timeout is generous.
119            let http = reqwest::Client::builder()
120                .connect_timeout(Duration::from_secs(5))
121                .build()
122                .map_err(|e| {
123                    CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
124                })?;
125            let timeout = Duration::from_millis(config.search.timeout_ms);
126            Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
127        } else {
128            None
129        };
130
131        let url_filter_cfg =
132            crw_crawl::url_filter::UrlFilterCfg::from_map_config(&config.map.url_filter);
133        // One-shot snapshot of how many rules the filter knows about. Helps
134        // operators confirm at boot that the deny-lists actually loaded.
135        let m = crw_core::metrics::metrics();
136        m.map_filter_rules_loaded
137            .with_label_values(&["action"])
138            .inc_by(
139                (crw_crawl::url_filter_data::DEFAULT_ACTION_PARAMS.len()
140                    + url_filter_cfg.action_params.len()) as u64,
141            );
142        m.map_filter_rules_loaded
143            .with_label_values(&["tracking"])
144            .inc_by(
145                (crw_crawl::url_filter_data::DEFAULT_TRACKING_PARAMS.len()
146                    + url_filter_cfg.tracking_params.len()) as u64,
147            );
148        m.map_filter_rules_loaded
149            .with_label_values(&["preserve"])
150            .inc_by(
151                (crw_crawl::url_filter_data::ALWAYS_PRESERVE.len()
152                    + url_filter_cfg.preserve_params.len()) as u64,
153            );
154        m.map_filter_rules_loaded
155            .with_label_values(&["host_override"])
156            .inc_by(url_filter_cfg.host_overrides.len() as u64);
157        let url_filter = Some(Arc::new(url_filter_cfg));
158
159        let state = Self {
160            config: Arc::new(config),
161            renderer: Arc::new(renderer),
162            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
163            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
164            searxng,
165            url_filter,
166        };
167
168        // Wrap the not-yet-returned state in a block to keep the Ok() shape at the end.
169        // Spawn background job cleanup task.
170        let cleanup_state = state.clone();
171        tokio::spawn(async move {
172            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
173            loop {
174                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
175                let mut jobs = cleanup_state.crawl_jobs.write().await;
176                let before = jobs.len();
177                jobs.retain(|_id, job| {
178                    let is_done = matches!(
179                        job.rx.borrow().status,
180                        CrawlStatus::Completed | CrawlStatus::Failed
181                    );
182                    // Keep if not done, or if done but within TTL.
183                    !is_done || job.created_at.elapsed() < ttl
184                });
185                let removed = before - jobs.len();
186                if removed > 0 {
187                    tracing::info!(
188                        removed,
189                        remaining = jobs.len(),
190                        "Cleaned up expired crawl jobs"
191                    );
192                }
193            }
194        });
195
196        Ok(state)
197    }
198
199    /// Start a new crawl job and return its UUID.
200    /// Spawns a background task that acquires the crawl semaphore before running.
201    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
202        let id = Uuid::new_v4();
203        let initial = CrawlState {
204            id,
205            success: true,
206            status: CrawlStatus::InProgress,
207            total: 0,
208            completed: 0,
209            data: vec![],
210            error: None,
211        };
212
213        let (tx, rx) = watch::channel(initial);
214
215        {
216            let mut jobs = self.crawl_jobs.write().await;
217            jobs.insert(
218                id,
219                CrawlJob {
220                    rx,
221                    created_at: Instant::now(),
222                    abort_handle: None,
223                },
224            );
225        }
226
227        let renderer = self.renderer.clone();
228        let max_concurrency = self.config.crawler.max_concurrency;
229        let respect_robots = self.config.crawler.respect_robots_txt;
230        let rps = self.config.crawler.requests_per_second;
231        let user_agent = self.config.crawler.user_agent.clone();
232        let crawl_semaphore = self.crawl_semaphore.clone();
233        let llm_config = self.config.extraction.llm.clone();
234        let proxy = self.config.crawler.proxy.clone();
235        let jitter_factor = self.config.crawler.stealth.jitter_factor;
236        let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
237        let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
238
239        let handle = tokio::spawn(async move {
240            let _permit = match crawl_semaphore.acquire().await {
241                Ok(p) => p,
242                Err(_) => {
243                    let _ = tx.send(CrawlState {
244                        id,
245                        success: false,
246                        status: CrawlStatus::Failed,
247                        total: 0,
248                        completed: 0,
249                        data: vec![],
250                        error: Some("Server is overloaded, try again later".into()),
251                    });
252                    return;
253                }
254            };
255            run_crawl(CrawlOptions {
256                id,
257                req,
258                renderer,
259                max_concurrency,
260                respect_robots,
261                requests_per_second: rps,
262                user_agent: &user_agent,
263                state_tx: tx,
264                llm_config: llm_config.as_ref(),
265                proxy,
266                jitter_factor,
267                deadline_ms_per_page,
268                per_host_max_concurrent,
269            })
270            .await;
271        });
272
273        // Store the abort handle so the job can be cancelled via DELETE.
274        {
275            let mut jobs = self.crawl_jobs.write().await;
276            if let Some(job) = jobs.get_mut(&id) {
277                job.abort_handle = Some(handle.abort_handle());
278            }
279        }
280
281        id
282    }
283}