1use crw_core::config::AppConfig;
2use crw_core::types::{CrawlRequest, CrawlState, CrawlStatus};
3use crw_crawl::crawl::{CrawlOptions, run_crawl};
4use crw_renderer::FallbackRenderer;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{RwLock, watch};
9use uuid::Uuid;
10
11pub struct CrawlJob {
13 pub rx: watch::Receiver<CrawlState>,
14 pub created_at: Instant,
15 pub abort_handle: Option<tokio::task::AbortHandle>,
17}
18
19const MAX_CONCURRENT_CRAWLS: usize = 10;
21const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
23
24#[derive(Clone)]
26pub struct AppState {
27 pub config: Arc<AppConfig>,
28 pub renderer: Arc<FallbackRenderer>,
29 pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
30 pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
31}
32
33impl AppState {
34 pub fn new(config: AppConfig) -> Self {
35 let proxy = config.crawler.proxy.as_deref();
36 let renderer = FallbackRenderer::new(
37 &config.renderer,
38 &config.crawler.user_agent,
39 proxy,
40 &config.crawler.stealth,
41 );
42
43 let state = Self {
44 config: Arc::new(config),
45 renderer: Arc::new(renderer),
46 crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
47 crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
48 };
49
50 let cleanup_state = state.clone();
52 tokio::spawn(async move {
53 let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
54 loop {
55 tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
56 let mut jobs = cleanup_state.crawl_jobs.write().await;
57 let before = jobs.len();
58 jobs.retain(|_id, job| {
59 let is_done = matches!(
60 job.rx.borrow().status,
61 CrawlStatus::Completed | CrawlStatus::Failed
62 );
63 !is_done || job.created_at.elapsed() < ttl
65 });
66 let removed = before - jobs.len();
67 if removed > 0 {
68 tracing::info!(
69 removed,
70 remaining = jobs.len(),
71 "Cleaned up expired crawl jobs"
72 );
73 }
74 }
75 });
76
77 state
78 }
79
80 pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
83 let id = Uuid::new_v4();
84 let initial = CrawlState {
85 id,
86 success: true,
87 status: CrawlStatus::InProgress,
88 total: 0,
89 completed: 0,
90 data: vec![],
91 error: None,
92 };
93
94 let (tx, rx) = watch::channel(initial);
95
96 {
97 let mut jobs = self.crawl_jobs.write().await;
98 jobs.insert(
99 id,
100 CrawlJob {
101 rx,
102 created_at: Instant::now(),
103 abort_handle: None,
104 },
105 );
106 }
107
108 let renderer = self.renderer.clone();
109 let max_concurrency = self.config.crawler.max_concurrency;
110 let respect_robots = self.config.crawler.respect_robots_txt;
111 let rps = self.config.crawler.requests_per_second;
112 let user_agent = self.config.crawler.user_agent.clone();
113 let crawl_semaphore = self.crawl_semaphore.clone();
114 let llm_config = self.config.extraction.llm.clone();
115 let proxy = self.config.crawler.proxy.clone();
116 let jitter_factor = self.config.crawler.stealth.jitter_factor;
117
118 let handle = tokio::spawn(async move {
119 let _permit = match crawl_semaphore.acquire().await {
120 Ok(p) => p,
121 Err(_) => {
122 let _ = tx.send(CrawlState {
123 id,
124 success: false,
125 status: CrawlStatus::Failed,
126 total: 0,
127 completed: 0,
128 data: vec![],
129 error: Some("Server is overloaded, try again later".into()),
130 });
131 return;
132 }
133 };
134 run_crawl(CrawlOptions {
135 id,
136 req,
137 renderer,
138 max_concurrency,
139 respect_robots,
140 requests_per_second: rps,
141 user_agent: &user_agent,
142 state_tx: tx,
143 llm_config: llm_config.as_ref(),
144 proxy,
145 jitter_factor,
146 })
147 .await;
148 });
149
150 {
152 let mut jobs = self.crawl_jobs.write().await;
153 if let Some(job) = jobs.get_mut(&id) {
154 job.abort_handle = Some(handle.abort_handle());
155 }
156 }
157
158 id
159 }
160}