1use crw_core::config::AppConfig;
2use crw_core::error::{CrwError, CrwResult};
3use crw_core::types::{
4 CrawlRequest, CrawlState, CrawlStatus, RequestedRenderer, resolve_pinned_renderer,
5 resolve_render_js,
6};
7use crw_crawl::crawl::{CrawlOptions, run_crawl};
8use crw_renderer::FallbackRenderer;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12use tokio::sync::{RwLock, watch};
13use uuid::Uuid;
14
15pub(crate) fn validate_renderer_pin(
26 renderer: Option<RequestedRenderer>,
27 render_js: Option<bool>,
28 state: &AppState,
29) -> CrwResult<()> {
30 let Some(name) = resolve_pinned_renderer(renderer) else {
31 return Ok(());
32 };
33
34 let effective_request = if render_js.is_none() {
40 Some(true)
41 } else {
42 render_js
43 };
44 let effective_render_js =
45 resolve_render_js(effective_request, state.config.renderer.render_js_default);
46
47 if effective_render_js == Some(false) {
48 return Ok(());
49 }
50
51 let available = state.renderer.js_renderer_names();
52 if !available.contains(&name) {
53 return Err(CrwError::InvalidRequest(format!(
54 "renderer '{}' not available; configured renderers: [{}]. \
55 Update server config or omit the 'renderer' field.",
56 name,
57 available.join(", ")
58 )));
59 }
60 Ok(())
61}
62
63pub(crate) fn validate_crawl_renderer(req: &CrawlRequest, state: &AppState) -> CrwResult<()> {
65 validate_renderer_pin(req.renderer, req.render_js, state)
66}
67
68pub struct CrawlJob {
70 pub rx: watch::Receiver<CrawlState>,
71 pub created_at: Instant,
72 pub abort_handle: Option<tokio::task::AbortHandle>,
74}
75
76const MAX_CONCURRENT_CRAWLS: usize = 10;
78const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
80
81#[derive(Clone)]
83pub struct AppState {
84 pub config: Arc<AppConfig>,
85 pub renderer: Arc<FallbackRenderer>,
86 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
87 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
88}
89
90impl AppState {
91 pub fn new(config: AppConfig) -> CrwResult<Self> {
92 let proxy = config.crawler.proxy.as_deref();
93 let renderer = FallbackRenderer::new(
94 &config.renderer,
95 &config.crawler.user_agent,
96 proxy,
97 &config.crawler.stealth,
98 )?
99 .with_host_limits(
100 config.crawler.requests_per_second,
101 config.crawler.per_host_max_concurrent,
102 );
103
104 let state = Self {
105 config: Arc::new(config),
106 renderer: Arc::new(renderer),
107 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
108 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
109 };
110
111 let cleanup_state = state.clone();
114 tokio::spawn(async move {
115 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
116 loop {
117 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
118 let mut jobs = cleanup_state.crawl_jobs.write().await;
119 let before = jobs.len();
120 jobs.retain(|_id, job| {
121 let is_done = matches!(
122 job.rx.borrow().status,
123 CrawlStatus::Completed | CrawlStatus::Failed
124 );
125 !is_done || job.created_at.elapsed() < ttl
127 });
128 let removed = before - jobs.len();
129 if removed > 0 {
130 tracing::info!(
131 removed,
132 remaining = jobs.len(),
133 "Cleaned up expired crawl jobs"
134 );
135 }
136 }
137 });
138
139 Ok(state)
140 }
141
142 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
145 let id = Uuid::new_v4();
146 let initial = CrawlState {
147 id,
148 success: true,
149 status: CrawlStatus::InProgress,
150 total: 0,
151 completed: 0,
152 data: vec![],
153 error: None,
154 };
155
156 let (tx, rx) = watch::channel(initial);
157
158 {
159 let mut jobs = self.crawl_jobs.write().await;
160 jobs.insert(
161 id,
162 CrawlJob {
163 rx,
164 created_at: Instant::now(),
165 abort_handle: None,
166 },
167 );
168 }
169
170 let renderer = self.renderer.clone();
171 let max_concurrency = self.config.crawler.max_concurrency;
172 let respect_robots = self.config.crawler.respect_robots_txt;
173 let rps = self.config.crawler.requests_per_second;
174 let user_agent = self.config.crawler.user_agent.clone();
175 let crawl_semaphore = self.crawl_semaphore.clone();
176 let llm_config = self.config.extraction.llm.clone();
177 let proxy = self.config.crawler.proxy.clone();
178 let jitter_factor = self.config.crawler.stealth.jitter_factor;
179 let deadline_ms_per_page = self.config.request.deadline_ms_default;
180 let per_host_max_concurrent = self.config.crawler.per_host_max_concurrent;
181
182 let handle = tokio::spawn(async move {
183 let _permit = match crawl_semaphore.acquire().await {
184 Ok(p) => p,
185 Err(_) => {
186 let _ = tx.send(CrawlState {
187 id,
188 success: false,
189 status: CrawlStatus::Failed,
190 total: 0,
191 completed: 0,
192 data: vec![],
193 error: Some("Server is overloaded, try again later".into()),
194 });
195 return;
196 }
197 };
198 run_crawl(CrawlOptions {
199 id,
200 req,
201 renderer,
202 max_concurrency,
203 respect_robots,
204 requests_per_second: rps,
205 user_agent: &user_agent,
206 state_tx: tx,
207 llm_config: llm_config.as_ref(),
208 proxy,
209 jitter_factor,
210 deadline_ms_per_page,
211 per_host_max_concurrent,
212 })
213 .await;
214 });
215
216 {
218 let mut jobs = self.crawl_jobs.write().await;
219 if let Some(job) = jobs.get_mut(&id) {
220 job.abort_handle = Some(handle.abort_handle());
221 }
222 }
223
224 id
225 }
226}