Skip to main content

crw_server/
state.rs

1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4    CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5    resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{RwLock, watch};
13use uuid::Uuid;
14
15/// Validate that a request's pinned renderer is available before accepting
16/// the job. Returns `InvalidRequest` (→ HTTP 400) when the named renderer is
17/// not in the configured pool. Skipped when `renderJs:false` is set, since
18/// HTTP-only ignores the pin.
19///
20/// We surface this explicitly (rather than silently falling back to "auto")
21/// so users get clear feedback when they ask for a renderer the operator
22/// hasn't configured. Sites that fail under one renderer often need a
23/// specific other one — silent fallback would leave callers wondering why
24/// "chrome" gave them the same broken result as "auto".
25pub(crate) fn validate_renderer_pin(
26    renderer: Option<RequestedRenderer>,
27    render_js: Option<bool>,
28    state: &AppState,
29) -> CrwResult<()> {
30    let Some(name) = resolve_pinned_renderer(renderer) else {
31        return Ok(());
32    };
33
34    // Mirror the fetch-path resolution at `crw-crawl/src/single.rs:41-50` so
35    // validation is consistent with what the actual request does. "Pinned
36    // implies JS" — when a renderer is pinned and the request omits
37    // `renderJs`, force the request to JS=true so a `render_js_default=false`
38    // server config doesn't silently send the request through HTTP-only.
39    let effective_request = if render_js.is_none() {
40        Some(true)
41    } else {
42        render_js
43    };
44    let effective_render_js =
45        resolve_render_js(effective_request, state.config.renderer.render_js_default);
46
47    if effective_render_js == Some(false) {
48        return Ok(());
49    }
50
51    let available = state.renderer.js_renderer_names();
52    if !available.contains(&name) {
53        return Err(CrwError::InvalidRequest(format!(
54            "renderer '{}' not available; configured renderers: [{}]. \
55             Update server config or omit the 'renderer' field.",
56            name,
57            available.join(", ")
58        )));
59    }
60    Ok(())
61}
62
63/// Crawl-specific wrapper around [`validate_renderer_pin`].
64pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
65    validate_renderer_pin(req.renderer, req.render_js, state)
66}
67
68/// Tracks a crawl job receiver + creation time for TTL cleanup.
69pub struct CrawlJob {
70    pub rx: watch::Receiver<CrawlState>,
71    pub created_at: Instant,
72    /// Handle to abort the crawl task.
73    pub abort_handle: Option<tokio::task::AbortHandle>,
74}
75
76/// Maximum number of concurrent crawl jobs.
77const MAX_CONCURRENT_CRAWLS: usize = 10;
78/// Interval between expired crawl job cleanup runs.
79const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
80
81/// Shared application state.
82#[derive(Clone)]
83pub struct AppState {
84    pub config: Arc<AppConfig>,
85    pub renderer: Arc<FallbackRenderer>,
86    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
87    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
88}
89
90impl AppState {
91    pub fn new(config: AppConfig) -> CrwResult<Self> {
92        let proxy = config.crawler.proxy.as_deref();
93        let renderer = FallbackRenderer::new(
94            &config.renderer,
95            &config.crawler.user_agent,
96            proxy,
97            &config.crawler.stealth,
98        )?
99        .with_host_limits(
100            config.crawler.requests_per_second,
101            config.crawler.per_host_max_concurrent,
102        );
103
104        let state = Self {
105            config: Arc::new(config),
106            renderer: Arc::new(renderer),
107            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
108            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
109        };
110
111        // Wrap the not-yet-returned state in a block to keep the Ok() shape at the end.
112        // Spawn background job cleanup task.
113        let cleanup_state = state.clone();
114        tokio::spawn(async move {
115            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
116            loop {
117                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
118                let mut jobs = cleanup_state.crawl_jobs.write().await;
119                let before = jobs.len();
120                jobs.retain(|_id, job| {
121                    let is_done = matches!(
122                        job.rx.borrow().status,
123                        CrawlStatus::Completed | CrawlStatus::Failed
124                    );
125                    // Keep if not done, or if done but within TTL.
126                    !is_done || job.created_at.elapsed() < ttl
127                });
128                let removed = before - jobs.len();
129                if removed > 0 {
130                    tracing::info!(
131                        removed,
132                        remaining = jobs.len(),
133                        "Cleaned up expired crawl jobs"
134                    );
135                }
136            }
137        });
138
139        Ok(state)
140    }
141
142    /// Start a new crawl job and return its UUID.
143    /// Spawns a background task that acquires the crawl semaphore before running.
144    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
145        let id = Uuid::new_v4();
146        let initial = CrawlState {
147            id,
148            success: true,
149            status: CrawlStatus::InProgress,
150            total: 0,
151            completed: 0,
152            data: vec![],
153            error: None,
154        };
155
156        let (tx, rx) = watch::channel(initial);
157
158        {
159            let mut jobs = self.crawl_jobs.write().await;
160            jobs.insert(
161                id,
162                CrawlJob {
163                    rx,
164                    created_at: Instant::now(),
165                    abort_handle: None,
166                },
167            );
168        }
169
170        let renderer = self.renderer.clone();
171        let max_concurrency = self.config.crawler.max_concurrency;
172        let respect_robots = self.config.crawler.respect_robots_txt;
173        let rps = self.config.crawler.requests_per_second;
174        let user_agent = self.config.crawler.user_agent.clone();
175        let crawl_semaphore = self.crawl_semaphore.clone();
176        let llm_config = self.config.extraction.llm.clone();
177        let proxy = self.config.crawler.proxy.clone();
178        let jitter_factor = self.config.crawler.stealth.jitter_factor;
179        let deadline_ms_per_page = self.config.request.deadline_ms_default;
180        let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
181
182        let handle = tokio::spawn(async move {
183            let _permit = match crawl_semaphore.acquire().await {
184                Ok(p) => p,
185                Err(_) => {
186                    let _ = tx.send(CrawlState {
187                        id,
188                        success: false,
189                        status: CrawlStatus::Failed,
190                        total: 0,
191                        completed: 0,
192                        data: vec![],
193                        error: Some("Server is overloaded, try again later".into()),
194                    });
195                    return;
196                }
197            };
198            run_crawl(CrawlOptions {
199                id,
200                req,
201                renderer,
202                max_concurrency,
203                respect_robots,
204                requests_per_second: rps,
205                user_agent: &user_agent,
206                state_tx: tx,
207                llm_config: llm_config.as_ref(),
208                proxy,
209                jitter_factor,
210                deadline_ms_per_page,
211                per_host_max_concurrent,
212            })
213            .await;
214        });
215
216        // Store the abort handle so the job can be cancelled via DELETE.
217        {
218            let mut jobs = self.crawl_jobs.write().await;
219            if let Some(job) = jobs.get_mut(&id) {
220                job.abort_handle = Some(handle.abort_handle());
221            }
222        }
223
224        id
225    }
226}