Skip to main content

essence/api/
crawl.rs

1use crate::{
2    crawler::{crawl_website, crawl_website_stream, ParallelCrawler},
3    error::ScrapeError,
4    types::{CrawlEvent, CrawlRequest, CrawlResponse},
5    validation,
6};
7use axum::{
8    response::sse::{Event, Sse},
9    Json,
10};
11use futures::stream::{Stream, StreamExt};
12use std::convert::Infallible;
13use std::time::Duration;
14use tokio::sync::mpsc;
15use tokio::time::timeout;
16use tokio_stream::wrappers::ReceiverStream;
17use tracing::{error, info, warn};
18
19/// Handler for POST /api/v1/crawl
20pub async fn crawl_handler(
21    Json(request): Json<CrawlRequest>,
22) -> Result<Json<CrawlResponse>, ScrapeError> {
23    info!("Crawl request received for URL: {}", request.url);
24
25    // Validate request (includes SSRF protection)
26    validation::validate_crawl_request(&request).await?;
27
28    // Log crawl parameters
29    info!(
30        "Crawl parameters - max_depth: {}, limit: {}, allow_backward_links: {:?}, allow_external_links: {:?}",
31        request.max_depth,
32        request.limit,
33        request.allow_backward_links,
34        request.allow_external_links
35    );
36
37    // Execute the crawl with timeout
38    let crawl_timeout = validation::get_crawl_timeout();
39    let use_parallel = request.use_parallel.unwrap_or(false);
40
41    let result = if use_parallel {
42        info!("Using parallel crawler for better performance");
43        let parallel_crawler = ParallelCrawler::new();
44        timeout(crawl_timeout, parallel_crawler.crawl_parallel(&request))
45            .await
46            .map_err(|_| {
47                warn!("Parallel crawl timeout after {:?}", crawl_timeout);
48                ScrapeError::Timeout
49            })?
50            .map_err(|e| {
51                error!("Failed to crawl website {} (parallel): {}", request.url, e);
52                e
53            })
54    } else {
55        timeout(crawl_timeout, crawl_website(&request))
56            .await
57            .map_err(|_| {
58                warn!("Crawl timeout after {:?}", crawl_timeout);
59                ScrapeError::Timeout
60            })?
61            .map_err(|e| {
62                error!("Failed to crawl website {}: {}", request.url, e);
63                e
64            })
65    };
66
67    let documents = result?;
68
69    info!(
70        "Crawl completed for URL: {} - {} pages scraped",
71        request.url,
72        documents.len()
73    );
74
75    Ok(Json(CrawlResponse::success(documents)))
76}
77
78/// Handler for POST /api/v1/crawl/stream - SSE streaming endpoint
79///
80/// Streams crawl events as Server-Sent Events (SSE):
81/// - status: Crawl progress updates
82/// - document: Completed documents
83/// - error: Error events for individual URLs
84/// - complete: Final summary
85///
86/// Example usage with curl:
87/// ```bash
88/// curl -N -X POST http://localhost:8080/api/v1/crawl/stream \
89///   -H "Content-Type: application/json" \
90///   -d '{"url": "https://example.com", "limit": 50, "max_depth": 2}'
91/// ```
92pub async fn crawl_stream_handler(
93    Json(request): Json<CrawlRequest>,
94) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ScrapeError> {
95    info!("Streaming crawl request received for URL: {}", request.url);
96
97    // Validate request (includes SSRF protection)
98    validation::validate_crawl_request(&request).await?;
99
100    // Log crawl parameters
101    info!(
102        "Crawl parameters - max_depth: {}, limit: {}, allow_backward_links: {:?}, allow_external_links: {:?}",
103        request.max_depth,
104        request.limit,
105        request.allow_backward_links,
106        request.allow_external_links
107    );
108
109    // Create channel for streaming events
110    // Buffer size of 100 allows crawler to continue working even if client is slow
111    let (tx, rx) = mpsc::channel::<crate::error::Result<CrawlEvent>>(100);
112
113    // Spawn crawler task in background
114    tokio::spawn(async move {
115        let result = crawl_website_stream(request, tx).await;
116
117        if let Err(e) = result {
118            error!("Streaming crawl failed: {}", e);
119        }
120    });
121
122    // Convert receiver to SSE stream
123    let stream = ReceiverStream::new(rx).map(|event_result| {
124        match event_result {
125            Ok(crawl_event) => {
126                // Serialize event to JSON
127                match serde_json::to_string(&crawl_event) {
128                    Ok(json) => {
129                        // Determine event name based on type
130                        let event_name = match &crawl_event {
131                            CrawlEvent::Status { .. } => "status",
132                            CrawlEvent::Document { .. } => "document",
133                            CrawlEvent::Error { .. } => "error",
134                            CrawlEvent::Complete { .. } => "complete",
135                        };
136
137                        Ok(Event::default()
138                            .event(event_name)
139                            .data(json))
140                    }
141                    Err(e) => {
142                        error!("Failed to serialize crawl event: {}", e);
143                        Ok(Event::default()
144                            .event("error")
145                            .data(format!(r#"{{"type":"error","url":"","error":"Failed to serialize event: {}"}}"#, e)))
146                    }
147                }
148            }
149            Err(e) => {
150                // Send error event
151                Ok(Event::default()
152                    .event("error")
153                    .data(format!(r#"{{"type":"error","url":"","error":"{}"}}"#, e)))
154            }
155        }
156    });
157
158    // Create SSE response with keep-alive
159    Ok(Sse::new(stream).keep_alive(
160        axum::response::sse::KeepAlive::new()
161            .interval(Duration::from_secs(15))
162            .text("keep-alive"),
163    ))
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[tokio::test]
171    async fn test_crawl_handler_invalid_url() {
172        let request = CrawlRequest {
173            url: "".to_string(),
174            exclude_paths: None,
175            include_paths: None,
176            max_depth: 2,
177            limit: 100,
178            allow_backward_links: None,
179            allow_external_links: None,
180            ignore_sitemap: None,
181            detect_pagination: None,
182            max_pagination_pages: None,
183            use_parallel: None,
184        };
185
186        let result = crawl_handler(Json(request)).await;
187        assert!(result.is_err());
188    }
189
190    #[tokio::test]
191    async fn test_crawl_stream_handler_invalid_url() {
192        let request = CrawlRequest {
193            url: "".to_string(),
194            exclude_paths: None,
195            include_paths: None,
196            max_depth: 2,
197            limit: 100,
198            allow_backward_links: None,
199            allow_external_links: None,
200            ignore_sitemap: None,
201            detect_pagination: None,
202            max_pagination_pages: None,
203            use_parallel: None,
204        };
205
206        let result = crawl_stream_handler(Json(request)).await;
207        assert!(result.is_err());
208    }
209}