1use crate::{
2 crawler::{crawl_website, crawl_website_stream, ParallelCrawler},
3 error::ScrapeError,
4 types::{CrawlEvent, CrawlRequest, CrawlResponse},
5 validation,
6};
7use axum::{
8 response::sse::{Event, Sse},
9 Json,
10};
11use futures::stream::{Stream, StreamExt};
12use std::convert::Infallible;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio::time::timeout;
16use tokio_stream::wrappers::ReceiverStream;
17use tracing::{error, info, warn};
18
19pub async fn crawl_handler(
21 Json(request): Json<CrawlRequest>,
22) -> Result<Json<CrawlResponse>, ScrapeError> {
23 info!("Crawl request received for URL: {}", request.url);
24
25 validation::validate_crawl_request(&request).await?;
27
28 info!(
30 "Crawl parameters - max_depth: {}, limit: {}, allow_backward_links: {:?}, allow_external_links: {:?}",
31 request.max_depth,
32 request.limit,
33 request.allow_backward_links,
34 request.allow_external_links
35 );
36
37 let crawl_timeout = validation::get_crawl_timeout();
39 let use_parallel = request.use_parallel.unwrap_or(false);
40
41 let result = if use_parallel {
42 info!("Using parallel crawler for better performance");
43 let parallel_crawler = ParallelCrawler::new();
44 timeout(crawl_timeout, parallel_crawler.crawl_parallel(&request))
45 .await
46 .map_err(|_| {
47 warn!("Parallel crawl timeout after {:?}", crawl_timeout);
48 ScrapeError::Timeout
49 })?
50 .map_err(|e| {
51 error!("Failed to crawl website {} (parallel): {}", request.url, e);
52 e
53 })
54 } else {
55 timeout(crawl_timeout, crawl_website(&request))
56 .await
57 .map_err(|_| {
58 warn!("Crawl timeout after {:?}", crawl_timeout);
59 ScrapeError::Timeout
60 })?
61 .map_err(|e| {
62 error!("Failed to crawl website {}: {}", request.url, e);
63 e
64 })
65 };
66
67 let documents = result?;
68
69 info!(
70 "Crawl completed for URL: {} - {} pages scraped",
71 request.url,
72 documents.len()
73 );
74
75 Ok(Json(CrawlResponse::success(documents)))
76}
77
78pub async fn crawl_stream_handler(
93 Json(request): Json<CrawlRequest>,
94) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ScrapeError> {
95 info!("Streaming crawl request received for URL: {}", request.url);
96
97 validation::validate_crawl_request(&request).await?;
99
100 info!(
102 "Crawl parameters - max_depth: {}, limit: {}, allow_backward_links: {:?}, allow_external_links: {:?}",
103 request.max_depth,
104 request.limit,
105 request.allow_backward_links,
106 request.allow_external_links
107 );
108
109 let (tx, rx) = mpsc::channel::<crate::error::Result<CrawlEvent>>(100);
112
113 tokio::spawn(async move {
115 let result = crawl_website_stream(request, tx).await;
116
117 if let Err(e) = result {
118 error!("Streaming crawl failed: {}", e);
119 }
120 });
121
122 let stream = ReceiverStream::new(rx).map(|event_result| {
124 match event_result {
125 Ok(crawl_event) => {
126 match serde_json::to_string(&crawl_event) {
128 Ok(json) => {
129 let event_name = match &crawl_event {
131 CrawlEvent::Status { .. } => "status",
132 CrawlEvent::Document { .. } => "document",
133 CrawlEvent::Error { .. } => "error",
134 CrawlEvent::Complete { .. } => "complete",
135 };
136
137 Ok(Event::default()
138 .event(event_name)
139 .data(json))
140 }
141 Err(e) => {
142 error!("Failed to serialize crawl event: {}", e);
143 Ok(Event::default()
144 .event("error")
145 .data(format!(r#"{{"type":"error","url":"","error":"Failed to serialize event: {}"}}"#, e)))
146 }
147 }
148 }
149 Err(e) => {
150 Ok(Event::default()
152 .event("error")
153 .data(format!(r#"{{"type":"error","url":"","error":"{}"}}"#, e)))
154 }
155 }
156 });
157
158 Ok(Sse::new(stream).keep_alive(
160 axum::response::sse::KeepAlive::new()
161 .interval(Duration::from_secs(15))
162 .text("keep-alive"),
163 ))
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[tokio::test]
171 async fn test_crawl_handler_invalid_url() {
172 let request = CrawlRequest {
173 url: "".to_string(),
174 exclude_paths: None,
175 include_paths: None,
176 max_depth: 2,
177 limit: 100,
178 allow_backward_links: None,
179 allow_external_links: None,
180 ignore_sitemap: None,
181 detect_pagination: None,
182 max_pagination_pages: None,
183 use_parallel: None,
184 };
185
186 let result = crawl_handler(Json(request)).await;
187 assert!(result.is_err());
188 }
189
190 #[tokio::test]
191 async fn test_crawl_stream_handler_invalid_url() {
192 let request = CrawlRequest {
193 url: "".to_string(),
194 exclude_paths: None,
195 include_paths: None,
196 max_depth: 2,
197 limit: 100,
198 allow_backward_links: None,
199 allow_external_links: None,
200 ignore_sitemap: None,
201 detect_pagination: None,
202 max_pagination_pages: None,
203 use_parallel: None,
204 };
205
206 let result = crawl_stream_handler(Json(request)).await;
207 assert!(result.is_err());
208 }
209}