Skip to main content

crw_server/
state.rs

1use crw_core::config::AppConfig;
2use crw_core::types::{CrawlRequest, CrawlState, CrawlStatus};
3use crw_crawl::crawl::{CrawlOptions, run_crawl};
4use crw_renderer::FallbackRenderer;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{RwLock, watch};
9use uuid::Uuid;
10
11/// Tracks a crawl job receiver + creation time for TTL cleanup.
12pub struct CrawlJob {
13    pub rx: watch::Receiver<CrawlState>,
14    pub created_at: Instant,
15    /// Handle to abort the crawl task.
16    pub abort_handle: Option<tokio::task::AbortHandle>,
17}
18
19/// Maximum number of concurrent crawl jobs.
20const MAX_CONCURRENT_CRAWLS: usize = 10;
21/// Interval between expired crawl job cleanup runs.
22const JOB_CLEANUP_INTERVAL: Duration = Duration::from_secs(60);
23
24/// Shared application state.
25#[derive(Clone)]
26pub struct AppState {
27    pub config: Arc<AppConfig>,
28    pub renderer: Arc<FallbackRenderer>,
29    pub crawl_jobs: Arc<RwLock<HashMap<Uuid, CrawlJob>>>,
30    pub crawl_semaphore: Arc<tokio::sync::Semaphore>,
31}
32
33impl AppState {
34    pub fn new(config: AppConfig) -> Self {
35        let proxy = config.crawler.proxy.as_deref();
36        let renderer = FallbackRenderer::new(
37            &config.renderer,
38            &config.crawler.user_agent,
39            proxy,
40            &config.crawler.stealth,
41        );
42
43        let state = Self {
44            config: Arc::new(config),
45            renderer: Arc::new(renderer),
46            crawl_jobs: Arc::new(RwLock::new(HashMap::new())),
47            crawl_semaphore: Arc::new(tokio::sync::Semaphore::new(MAX_CONCURRENT_CRAWLS)),
48        };
49
50        // Spawn background job cleanup task.
51        let cleanup_state = state.clone();
52        tokio::spawn(async move {
53            let ttl = Duration::from_secs(cleanup_state.config.crawler.job_ttl_secs);
54            loop {
55                tokio::time::sleep(JOB_CLEANUP_INTERVAL).await;
56                let mut jobs = cleanup_state.crawl_jobs.write().await;
57                let before = jobs.len();
58                jobs.retain(|_id, job| {
59                    let is_done = matches!(
60                        job.rx.borrow().status,
61                        CrawlStatus::Completed | CrawlStatus::Failed
62                    );
63                    // Keep if not done, or if done but within TTL.
64                    !is_done || job.created_at.elapsed() < ttl
65                });
66                let removed = before - jobs.len();
67                if removed > 0 {
68                    tracing::info!(
69                        removed,
70                        remaining = jobs.len(),
71                        "Cleaned up expired crawl jobs"
72                    );
73                }
74            }
75        });
76
77        state
78    }
79
80    /// Start a new crawl job and return its UUID.
81    /// Spawns a background task that acquires the crawl semaphore before running.
82    pub async fn start_crawl_job(&self, req: CrawlRequest) -> Uuid {
83        let id = Uuid::new_v4();
84        let initial = CrawlState {
85            id,
86            success: true,
87            status: CrawlStatus::InProgress,
88            total: 0,
89            completed: 0,
90            data: vec![],
91            error: None,
92        };
93
94        let (tx, rx) = watch::channel(initial);
95
96        {
97            let mut jobs = self.crawl_jobs.write().await;
98            jobs.insert(
99                id,
100                CrawlJob {
101                    rx,
102                    created_at: Instant::now(),
103                    abort_handle: None,
104                },
105            );
106        }
107
108        let renderer = self.renderer.clone();
109        let max_concurrency = self.config.crawler.max_concurrency;
110        let respect_robots = self.config.crawler.respect_robots_txt;
111        let rps = self.config.crawler.requests_per_second;
112        let user_agent = self.config.crawler.user_agent.clone();
113        let crawl_semaphore = self.crawl_semaphore.clone();
114        let llm_config = self.config.extraction.llm.clone();
115        let proxy = self.config.crawler.proxy.clone();
116        let jitter_factor = self.config.crawler.stealth.jitter_factor;
117
118        let handle = tokio::spawn(async move {
119            let _permit = match crawl_semaphore.acquire().await {
120                Ok(p) => p,
121                Err(_) => {
122                    let _ = tx.send(CrawlState {
123                        id,
124                        success: false,
125                        status: CrawlStatus::Failed,
126                        total: 0,
127                        completed: 0,
128                        data: vec![],
129                        error: Some("Server is overloaded, try again later".into()),
130                    });
131                    return;
132                }
133            };
134            run_crawl(CrawlOptions {
135                id,
136                req,
137                renderer,
138                max_concurrency,
139                respect_robots,
140                requests_per_second: rps,
141                user_agent: &user_agent,
142                state_tx: tx,
143                llm_config: llm_config.as_ref(),
144                proxy,
145                jitter_factor,
146            })
147            .await;
148        });
149
150        // Store the abort handle so the job can be cancelled via DELETE.
151        {
152            let mut jobs = self.crawl_jobs.write().await;
153            if let Some(job) = jobs.get_mut(&id) {
154                job.abort_handle = Some(handle.abort_handle());
155            }
156        }
157
158        id
159    }
160}