Skip to main content

crw_server/
state.rs

1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4    CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5    resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use crw_search::SearxngClient;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{RwLock, watch};
14use uuid::Uuid;
15
16/// Validate that a request's pinned renderer is available before accepting
17/// the job. Returns `InvalidRequest` (→ HTTP 400) when the named renderer is
18/// not in the configured pool. Skipped when `renderJs:false` is set, since
19/// HTTP-only ignores the pin.
20///
21/// We surface this explicitly (rather than silently falling back to "auto")
22/// so users get clear feedback when they ask for a renderer the operator
23/// hasn't configured. Sites that fail under one renderer often need a
24/// specific other one — silent fallback would leave callers wondering why
25/// "chrome" gave them the same broken result as "auto".
26pub(crate) fn validate_renderer_pin(
27    renderer: Option<RequestedRenderer>,
28    render_js: Option<bool>,
29    state: &AppState,
30) -> CrwResult<()> {
31    let Some(name) = resolve_pinned_renderer(renderer) else {
32        return Ok(());
33    };
34
35    // Mirror the fetch-path resolution at `crw-crawl/src/single.rs:41-50` so
36    // validation is consistent with what the actual request does. "Pinned
37    // implies JS" — when a renderer is pinned and the request omits
38    // `renderJs`, force the request to JS=true so a `render_js_default=false`
39    // server config doesn't silently send the request through HTTP-only.
40    let effective_request = if render_js.is_none() {
41        Some(true)
42    } else {
43        render_js
44    };
45    let effective_render_js =
46        resolve_render_js(effective_request, state.config.renderer.render_js_default);
47
48    if effective_render_js == Some(false) {
49        return Ok(());
50    }
51
52    let available = state.renderer.js_renderer_names();
53    if !available.contains(&name) {
54        return Err(CrwError::InvalidRequest(format!(
55            "renderer '{}' not available; configured renderers: [{}]. \
56             Update server config or omit the 'renderer' field.",
57            name,
58            available.join(", ")
59        )));
60    }
61    Ok(())
62}
63
64/// Crawl-specific wrapper around [`validate_renderer_pin`].
65pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
66    validate_renderer_pin(req.renderer, req.render_js, state)
67}
68
69/// Tracks a crawl job receiver + creation time for TTL cleanup.
70pub struct CrawlJob {
71    pub rx: watch::Receiver<CrawlState>,
72    pub created_at: Instant,
73    /// Handle to abort the crawl task.
74    pub abort_handle: Option<tokio::task::AbortHandle>,
75}
76
77/// Maximum number of concurrent crawl jobs.
78const MAX_CONCURRENT_CRAWLS: usize = 10;
79/// Interval between expired crawl job cleanup runs.
80const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
81
82/// Shared application state.
83#[derive(Clone)]
84pub struct AppState {
85    pub config: Arc<AppConfig>,
86    pub renderer: Arc<FallbackRenderer>,
87    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
88    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
89    /// SearXNG client. `None` when `[search].searxng_url` is unset, in which
90    /// case `/v1/search` returns a clear `search_disabled` error.
91    pub searxng: Option<Arc<SearxngClient>>,
92}
93
94impl AppState {
95    pub fn new(config: AppConfig) -> CrwResult<Self> {
96        let proxy = config.crawler.proxy.as_deref();
97        let renderer = FallbackRenderer::new(
98            &config.renderer,
99            &config.crawler.user_agent,
100            proxy,
101            &config.crawler.stealth,
102        )?
103        .with_host_limits(
104            config.crawler.requests_per_second,
105            config.crawler.per_host_max_concurrent,
106        );
107
108        let searxng = if config.search.enabled
109            && let Some(url) = config.search.searxng_url.as_ref()
110        {
111            // Dedicated reqwest client for SearXNG so its connection pool is
112            // hot and isolated from the renderer / scrape paths. SearXNG runs
113            // on the same docker network in the bundled compose so a 5s
114            // connect_timeout is generous.
115            let http = reqwest::Client::builder()
116                .connect_timeout(Duration::from_secs(5))
117                .build()
118                .map_err(|e| {
119                    CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
120                })?;
121            let timeout = Duration::from_millis(config.search.timeout_ms);
122            Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
123        } else {
124            None
125        };
126
127        let state = Self {
128            config: Arc::new(config),
129            renderer: Arc::new(renderer),
130            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
131            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
132            searxng,
133        };
134
135        // Wrap the not-yet-returned state in a block to keep the Ok() shape at the end.
136        // Spawn background job cleanup task.
137        let cleanup_state = state.clone();
138        tokio::spawn(async move {
139            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
140            loop {
141                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
142                let mut jobs = cleanup_state.crawl_jobs.write().await;
143                let before = jobs.len();
144                jobs.retain(|_id, job| {
145                    let is_done = matches!(
146                        job.rx.borrow().status,
147                        CrawlStatus::Completed | CrawlStatus::Failed
148                    );
149                    // Keep if not done, or if done but within TTL.
150                    !is_done || job.created_at.elapsed() < ttl
151                });
152                let removed = before - jobs.len();
153                if removed > 0 {
154                    tracing::info!(
155                        removed,
156                        remaining = jobs.len(),
157                        "Cleaned up expired crawl jobs"
158                    );
159                }
160            }
161        });
162
163        Ok(state)
164    }
165
166    /// Start a new crawl job and return its UUID.
167    /// Spawns a background task that acquires the crawl semaphore before running.
168    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
169        let id = Uuid::new_v4();
170        let initial = CrawlState {
171            id,
172            success: true,
173            status: CrawlStatus::InProgress,
174            total: 0,
175            completed: 0,
176            data: vec![],
177            error: None,
178        };
179
180        let (tx, rx) = watch::channel(initial);
181
182        {
183            let mut jobs = self.crawl_jobs.write().await;
184            jobs.insert(
185                id,
186                CrawlJob {
187                    rx,
188                    created_at: Instant::now(),
189                    abort_handle: None,
190                },
191            );
192        }
193
194        let renderer = self.renderer.clone();
195        let max_concurrency = self.config.crawler.max_concurrency;
196        let respect_robots = self.config.crawler.respect_robots_txt;
197        let rps = self.config.crawler.requests_per_second;
198        let user_agent = self.config.crawler.user_agent.clone();
199        let crawl_semaphore = self.crawl_semaphore.clone();
200        let llm_config = self.config.extraction.llm.clone();
201        let proxy = self.config.crawler.proxy.clone();
202        let jitter_factor = self.config.crawler.stealth.jitter_factor;
203        let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
204        let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
205
206        let handle = tokio::spawn(async move {
207            let _permit = match crawl_semaphore.acquire().await {
208                Ok(p) => p,
209                Err(_) => {
210                    let _ = tx.send(CrawlState {
211                        id,
212                        success: false,
213                        status: CrawlStatus::Failed,
214                        total: 0,
215                        completed: 0,
216                        data: vec![],
217                        error: Some("Server is overloaded, try again later".into()),
218                    });
219                    return;
220                }
221            };
222            run_crawl(CrawlOptions {
223                id,
224                req,
225                renderer,
226                max_concurrency,
227                respect_robots,
228                requests_per_second: rps,
229                user_agent: &user_agent,
230                state_tx: tx,
231                llm_config: llm_config.as_ref(),
232                proxy,
233                jitter_factor,
234                deadline_ms_per_page,
235                per_host_max_concurrent,
236            })
237            .await;
238        });
239
240        // Store the abort handle so the job can be cancelled via DELETE.
241        {
242            let mut jobs = self.crawl_jobs.write().await;
243            if let Some(job) = jobs.get_mut(&id) {
244                job.abort_handle = Some(handle.abort_handle());
245            }
246        }
247
248        id
249    }
250}