use crate::client::HttpClient;
use crate::config::Config;
use crate::error::Result;
use crate::extractor::{DataExtractor};
use crate::types::ExtractionRule;
use crate::html_parser::HtmlParser;
use crate::types::{HttpMethod, ScrapedData, RequestStats};
use futures::stream::{self, StreamExt};
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct FerrisFetcher {
client: HttpClient,
extractor: DataExtractor,
config: Config,
}
impl FerrisFetcher {
pub fn new() -> Result<Self> {
let config = Config::default();
Self::with_config(config)
}
pub fn with_config(config: Config) -> Result<Self> {
let client = HttpClient::new(config.clone())?;
let extractor = DataExtractor::new();
Ok(Self {
client,
extractor,
config,
})
}
pub fn with_config_and_rules(config: Config, rules: Vec<ExtractionRule>) -> Result<Self> {
let client = HttpClient::new(config.clone())?;
let extractor = DataExtractor::with_rules(rules);
Ok(Self {
client,
extractor,
config,
})
}
pub async fn scrape(&self, url: &str) -> Result<ScrapedData> {
self.scrape_with_method(url, HttpMethod::Get, None).await
}
pub async fn scrape_with_method(&self, url: &str, method: HttpMethod, body: Option<String>) -> Result<ScrapedData> {
let start_time = Instant::now();
info!("Starting scrape of: {}", url);
let response = self.client.request(url, method, body, None).await?;
let status_code = response.status().as_u16();
let headers: std::collections::HashMap<String, String> = response
.headers()
.iter()
.map(|(name, value)| (name.to_string(), value.to_str().unwrap_or("").to_string()))
.collect();
let content = response.text().await?;
let parser = HtmlParser::new(&content)?;
let mut scraped_data = ScrapedData::new(url.to_string());
scraped_data.status_code = status_code;
scraped_data.headers = headers;
scraped_data.content = content.clone();
scraped_data.scrape_time_ms = start_time.elapsed().as_millis() as u64;
self.extract_basic_metadata(&parser, &mut scraped_data);
if self.extractor.rule_count() > 0 {
match self.extractor.extract_all(&parser) {
Ok(extracted_data) => {
scraped_data.extracted_data = extracted_data;
debug!("Extracted data for {} fields", scraped_data.extracted_data.len());
}
Err(e) => {
warn!("Failed to extract structured data: {}", e);
}
}
}
info!("Successfully scraped: {} ({}ms)", url, scraped_data.scrape_time_ms);
Ok(scraped_data)
}
pub async fn scrape_multiple(&self, urls: &[&str]) -> Result<Vec<ScrapedData>> {
info!("Starting concurrent scrape of {} URLs", urls.len());
let start_time = Instant::now();
let concurrency_limit = self.config.max_concurrent_requests;
let results = stream::iter(urls)
.map(|url| async move {
let scrape_start = Instant::now();
match self.scrape(url).await {
Ok(data) => {
debug!("Successfully scraped: {} ({}ms)", url, scrape_start.elapsed().as_millis());
Some(data)
}
Err(e) => {
error!("Failed to scrape {}: {}", url, e);
None
}
}
})
.buffer_unordered(concurrency_limit)
.collect::<Vec<_>>()
.await;
let successful_results: Vec<ScrapedData> = results.into_iter().flatten().collect();
let elapsed = start_time.elapsed();
info!("Completed scraping: {}/{} URLs in {}ms",
successful_results.len(),
urls.len(),
elapsed.as_millis());
Ok(successful_results)
}
pub async fn scrape_multiple_with_progress<F>(
&self,
urls: &[&str],
progress_callback: F
) -> Result<Vec<ScrapedData>>
where
F: Fn(usize, usize, &ScrapedData) + Send + Sync + 'static,
{
info!("Starting concurrent scrape of {} URLs with progress reporting", urls.len());
let concurrency_limit = self.config.max_concurrent_requests;
let total_urls = urls.len();
let (tx, mut rx) = mpsc::channel::<(usize, ScrapedData)>(concurrency_limit);
let progress_callback = Arc::new(progress_callback);
let progress_task = tokio::spawn(async move {
let mut _completed = 0;
while let Some((index, data)) = rx.recv().await {
_completed += 1;
progress_callback(index, total_urls, &data);
}
});
let results = stream::iter(urls.iter().enumerate())
.map(|(index, url)| {
let tx = tx.clone();
async move {
match self.scrape(url).await {
Ok(data) => {
let _ = tx.send((index, data.clone())).await;
Some(data)
}
Err(e) => {
error!("Failed to scrape {}: {}", url, e);
None
}
}
}
})
.buffer_unordered(concurrency_limit)
.collect::<Vec<_>>()
.await;
drop(tx); let _ = progress_task.await;
let successful_results: Vec<ScrapedData> = results.into_iter().flatten().collect();
info!("Completed scraping: {}/{} URLs", successful_results.len(), total_urls);
Ok(successful_results)
}
pub fn add_extraction_rule(&mut self, rule: ExtractionRule) {
self.extractor.add_rule(rule);
}
pub fn remove_extraction_rule(&mut self, name: &str) -> Option<ExtractionRule> {
self.extractor.remove_rule(name)
}
pub fn extraction_rules(&self) -> &std::collections::HashMap<String, ExtractionRule> {
self.extractor.rules()
}
pub async fn get_stats(&self) -> RequestStats {
self.client.get_stats().await
}
pub async fn reset_stats(&self) {
self.client.reset_stats().await;
}
pub fn config(&self) -> &Config {
&self.config
}
fn extract_basic_metadata(&self, parser: &HtmlParser, scraped_data: &mut ScrapedData) {
scraped_data.title = parser.title();
if let Some(description) = parser.description() {
scraped_data.add_metadata("description", description.into());
}
if let Some(keywords) = parser.keywords() {
scraped_data.add_metadata("keywords", keywords.into());
}
if let Some(canonical_url) = parser.canonical_url() {
scraped_data.add_metadata("canonical_url", canonical_url.into());
}
let json_ld = parser.json_ld();
if !json_ld.is_empty() {
scraped_data.add_metadata("json_ld", json_ld.into());
}
let links_count = parser.links().len();
let images_count = parser.images().len();
scraped_data.add_metadata("links_count", (links_count as u64).into());
scraped_data.add_metadata("images_count", (images_count as u64).into());
let forms_count = parser.forms().len();
scraped_data.add_metadata("forms_count", (forms_count as u64).into());
}
pub async fn scrape_and_extract(&self, url: &str, rule_name: &str) -> Result<Vec<String>> {
let scraped_data = self.scrape(url).await?;
let parser = HtmlParser::new(&scraped_data.content)?;
self.extractor.extract_by_name(&parser, rule_name)
}
pub async fn scrape_and_extract_single(&self, url: &str, rule_name: &str) -> Option<String> {
match self.scrape_and_extract(url, rule_name).await {
Ok(values) => values.into_iter().next(),
Err(_) => None,
}
}
pub fn has_rate_limiting(&self) -> bool {
self.client.has_rate_limiting()
}
pub fn max_concurrent_requests(&self) -> usize {
self.client.max_concurrent_requests()
}
}
impl Default for FerrisFetcher {
fn default() -> Self {
Self::new().expect("Failed to create default FerrisFetcher")
}
}
use std::sync::Arc;
pub struct FerrisFetcherBuilder {
config: Config,
rules: Vec<ExtractionRule>,
}
impl FerrisFetcherBuilder {
pub fn new() -> Self {
Self {
config: Config::default(),
rules: Vec::new(),
}
}
pub fn config(mut self, config: Config) -> Self {
self.config = config;
self
}
pub fn add_rule(mut self, rule: ExtractionRule) -> Self {
self.rules.push(rule);
self
}
pub fn add_rules(mut self, rules: Vec<ExtractionRule>) -> Self {
self.rules.extend(rules);
self
}
pub fn user_agent(mut self, user_agent: &str) -> Self {
self.config = self.config.with_user_agent(user_agent);
self
}
pub fn timeout(mut self, timeout: std::time::Duration) -> Self {
self.config = self.config.with_timeout(timeout);
self
}
pub fn max_concurrent_requests(mut self, max: usize) -> Self {
self.config = self.config.with_max_concurrent_requests(max);
self
}
pub fn without_rate_limit(mut self) -> Self {
self.config = self.config.without_rate_limit();
self
}
pub fn retry_policy(mut self, retry_policy: crate::types::RetryPolicy) -> Self {
self.config = self.config.with_retry_policy(retry_policy);
self
}
pub fn header(mut self, name: &str, value: &str) -> Result<Self> {
self.config = self.config.with_header(name, value)?;
Ok(self)
}
pub fn proxy(mut self, proxy: url::Url) -> Self {
self.config = self.config.with_proxy(proxy);
self
}
pub fn without_redirects(mut self) -> Self {
self.config = self.config.without_redirects();
self
}
pub fn without_cookies(mut self) -> Self {
self.config = self.config.without_cookies();
self
}
pub fn build(self) -> Result<FerrisFetcher> {
FerrisFetcher::with_config_and_rules(self.config, self.rules)
}
}
impl Default for FerrisFetcherBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_ferrisfetcher_creation() {
let fetcher = FerrisFetcher::new().unwrap();
assert!(fetcher.has_rate_limiting());
assert_eq!(fetcher.max_concurrent_requests(), 10);
}
}