1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4 CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5 resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use crw_search::SearxngClient;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::{RwLock, watch};
14use uuid::Uuid;
15
16pub(crate) fn validate_renderer_pin(
27 renderer: Option<RequestedRenderer>,
28 render_js: Option<bool>,
29 state: &AppState,
30) -> CrwResult<()> {
31 let Some(name) = resolve_pinned_renderer(renderer) else {
32 return Ok(());
33 };
34
35 let effective_request = if render_js.is_none() {
41 Some(true)
42 } else {
43 render_js
44 };
45 let effective_render_js =
46 resolve_render_js(effective_request, state.config.renderer.render_js_default);
47
48 if effective_render_js == Some(false) {
49 return Ok(());
50 }
51
52 let available = state.renderer.js_renderer_names();
53 if !available.contains(&name) {
54 return Err(CrwError::InvalidRequest(format!(
55 "renderer '{}' not available; configured renderers: [{}]. \
56 Update server config or omit the 'renderer' field.",
57 name,
58 available.join(", ")
59 )));
60 }
61 Ok(())
62}
63
64pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
66 validate_renderer_pin(req.renderer, req.render_js, state)
67}
68
69pub struct CrawlJob {
71 pub rx: watch::Receiver<CrawlState>,
72 pub created_at: Instant,
73 pub abort_handle: Option<tokio::task::AbortHandle>,
75}
76
77const MAX_CONCURRENT_CRAWLS: usize = 10;
79const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
81
82#[derive(Clone)]
84pub struct AppState {
85 pub config: Arc<AppConfig>,
86 pub renderer: Arc<FallbackRenderer>,
87 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
88 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
89 pub searxng: Option<Arc<SearxngClient>>,
92}
93
94impl AppState {
95 pub fn new(config: AppConfig) -> CrwResult<Self> {
96 let proxy = config.crawler.proxy.as_deref();
97 let renderer = FallbackRenderer::new(
98 &config.renderer,
99 &config.crawler.user_agent,
100 proxy,
101 &config.crawler.stealth,
102 )?
103 .with_host_limits(
104 config.crawler.requests_per_second,
105 config.crawler.per_host_max_concurrent,
106 );
107
108 let searxng = if config.search.enabled
109 && let Some(url) = config.search.searxng_url.as_ref()
110 {
111 let http = reqwest::Client::builder()
116 .connect_timeout(Duration::from_secs(5))
117 .build()
118 .map_err(|e| {
119 CrwError::Internal(format!("failed to build SearXNG http client: {e}"))
120 })?;
121 let timeout = Duration::from_millis(config.search.timeout_ms);
122 Some(Arc::new(SearxngClient::new(Arc::new(http), url, timeout)))
123 } else {
124 None
125 };
126
127 let state = Self {
128 config: Arc::new(config),
129 renderer: Arc::new(renderer),
130 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
131 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
132 searxng,
133 };
134
135 let cleanup_state = state.clone();
138 tokio::spawn(async move {
139 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
140 loop {
141 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
142 let mut jobs = cleanup_state.crawl_jobs.write().await;
143 let before = jobs.len();
144 jobs.retain(|_id, job| {
145 let is_done = matches!(
146 job.rx.borrow().status,
147 CrawlStatus::Completed | CrawlStatus::Failed
148 );
149 !is_done || job.created_at.elapsed() < ttl
151 });
152 let removed = before - jobs.len();
153 if removed > 0 {
154 tracing::info!(
155 removed,
156 remaining = jobs.len(),
157 "Cleaned up expired crawl jobs"
158 );
159 }
160 }
161 });
162
163 Ok(state)
164 }
165
166 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
169 let id = Uuid::new_v4();
170 let initial = CrawlState {
171 id,
172 success: true,
173 status: CrawlStatus::InProgress,
174 total: 0,
175 completed: 0,
176 data: vec![],
177 error: None,
178 };
179
180 let (tx, rx) = watch::channel(initial);
181
182 {
183 let mut jobs = self.crawl_jobs.write().await;
184 jobs.insert(
185 id,
186 CrawlJob {
187 rx,
188 created_at: Instant::now(),
189 abort_handle: None,
190 },
191 );
192 }
193
194 let renderer = self.renderer.clone();
195 let max_concurrency = self.config.crawler.max_concurrency;
196 let respect_robots = self.config.crawler.respect_robots_txt;
197 let rps = self.config.crawler.requests_per_second;
198 let user_agent = self.config.crawler.user_agent.clone();
199 let crawl_semaphore = self.crawl_semaphore.clone();
200 let llm_config = self.config.extraction.llm.clone();
201 let proxy = self.config.crawler.proxy.clone();
202 let jitter_factor = self.config.crawler.stealth.jitter_factor;
203 let deadline_ms_per_page = self.config.effective_deadline_ms(None, req.wait_for);
204 let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
205
206 let handle = tokio::spawn(async move {
207 let _permit = match crawl_semaphore.acquire().await {
208 Ok(p) => p,
209 Err(_) => {
210 let _ = tx.send(CrawlState {
211 id,
212 success: false,
213 status: CrawlStatus::Failed,
214 total: 0,
215 completed: 0,
216 data: vec![],
217 error: Some("Server is overloaded, try again later".into()),
218 });
219 return;
220 }
221 };
222 run_crawl(CrawlOptions {
223 id,
224 req,
225 renderer,
226 max_concurrency,
227 respect_robots,
228 requests_per_second: rps,
229 user_agent: &user_agent,
230 state_tx: tx,
231 llm_config: llm_config.as_ref(),
232 proxy,
233 jitter_factor,
234 deadline_ms_per_page,
235 per_host_max_concurrent,
236 })
237 .await;
238 });
239
240 {
242 let mut jobs = self.crawl_jobs.write().await;
243 if let Some(job) = jobs.get_mut(&id) {
244 job.abort_handle = Some(handle.abort_handle());
245 }
246 }
247
248 id
249 }
250}