mod client;
mod pipeline;
mod types;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
pub(crate) use types::{ContentExtraction, ConvertConfig, ResponseMeta};
#[allow(clippy::module_name_repetitions)]
pub use types::{FetchError, FetchResult};
use crate::options::Options;
const DEFAULT_USER_AGENT: &str = concat!("h2m/", env!("CARGO_PKG_VERSION"));
#[derive(Debug)]
pub struct FetcherBuilder {
options: Options,
gfm: bool,
domain: Option<String>,
content: ContentExtraction,
extract_links: bool,
concurrency: usize,
delay: Duration,
timeout: Duration,
user_agent: String,
}
impl Default for FetcherBuilder {
fn default() -> Self {
Self {
options: Options::default(),
gfm: false,
domain: None,
content: ContentExtraction::default(),
extract_links: false,
concurrency: 4,
delay: Duration::ZERO,
timeout: Duration::from_secs(30),
user_agent: DEFAULT_USER_AGENT.to_owned(),
}
}
}
impl FetcherBuilder {
#[must_use]
pub const fn options(mut self, options: Options) -> Self {
self.options = options;
self
}
#[must_use]
pub const fn gfm(mut self, enable: bool) -> Self {
self.gfm = enable;
self
}
#[must_use]
pub fn domain(mut self, domain: impl Into<String>) -> Self {
self.domain = Some(domain.into());
self
}
#[must_use]
pub fn selector(mut self, selector: impl Into<String>) -> Self {
self.content = ContentExtraction::Selector(selector.into());
self
}
#[must_use]
pub fn readable(mut self, enable: bool) -> Self {
self.content = if enable {
ContentExtraction::Readable
} else {
ContentExtraction::Full
};
self
}
#[must_use]
pub const fn extract_links(mut self, enable: bool) -> Self {
self.extract_links = enable;
self
}
#[must_use]
pub const fn concurrency(mut self, n: usize) -> Self {
self.concurrency = n;
self
}
#[must_use]
pub const fn delay(mut self, delay: Duration) -> Self {
self.delay = delay;
self
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
self.user_agent = ua.into();
self
}
pub fn build(self) -> Result<Fetcher, FetchError> {
let client = reqwest::Client::builder()
.user_agent(&self.user_agent)
.timeout(self.timeout)
.build()
.map_err(|e| FetchError {
error: format!("failed to build HTTP client: {e}"),
url: None,
})?;
Ok(Fetcher {
client,
options: self.options,
gfm: self.gfm,
domain: self.domain,
content: self.content,
extract_links: self.extract_links,
concurrency: self.concurrency.max(1),
delay: self.delay,
})
}
}
#[derive(Debug)]
pub struct Fetcher {
client: reqwest::Client,
options: Options,
gfm: bool,
domain: Option<String>,
content: ContentExtraction,
extract_links: bool,
concurrency: usize,
delay: Duration,
}
impl Fetcher {
#[must_use]
pub fn builder() -> FetcherBuilder {
FetcherBuilder::default()
}
pub async fn fetch(&self, url: &str) -> Result<FetchResult, FetchError> {
let start = Instant::now();
let (raw_html, meta) = client::fetch_html_inner(&self.client, url).await?;
let cfg = self.config();
Ok(pipeline::convert_to_result(
Some(url),
&raw_html,
start,
&cfg,
&meta,
))
}
pub async fn fetch_many<S: AsRef<str> + Sync>(
&self,
urls: &[S],
) -> Vec<Result<FetchResult, FetchError>> {
let sem = Arc::new(Semaphore::new(self.concurrency));
let cfg = Arc::new(self.config());
let mut handles = Vec::with_capacity(urls.len());
for (i, url) in urls.iter().enumerate() {
if i > 0 && !self.delay.is_zero() {
tokio::time::sleep(self.delay).await;
}
let Ok(permit) = Arc::clone(&sem).acquire_owned().await else {
break;
};
let owned_url = url.as_ref().to_owned();
let cli = self.client.clone();
let cfg_task = Arc::clone(&cfg);
handles.push(tokio::spawn(async move {
let _permit = permit;
let start = Instant::now();
let (raw_html, meta) = client::fetch_html_inner(&cli, &owned_url).await?;
Ok(pipeline::convert_to_result(
Some(&owned_url),
&raw_html,
start,
&cfg_task,
&meta,
))
}));
}
let mut results = Vec::with_capacity(handles.len());
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(e) => results.push(Err(FetchError {
error: format!("task panicked: {e}"),
url: None,
})),
}
}
results
}
pub async fn fetch_many_streaming<S, F>(&self, urls: &[S], mut on_result: F)
where
S: AsRef<str> + Sync,
F: FnMut(Result<FetchResult, FetchError>) + Send,
{
let sem = Arc::new(Semaphore::new(self.concurrency));
let (tx, mut rx) =
tokio::sync::mpsc::channel::<Result<FetchResult, FetchError>>(self.concurrency * 2);
let urls_owned: Vec<String> = urls.iter().map(|s| s.as_ref().to_owned()).collect();
let client = self.client.clone();
let cfg = Arc::new(self.config());
let delay = self.delay;
let producer = tokio::spawn(async move {
for (i, url) in urls_owned.iter().enumerate() {
if i > 0 && !delay.is_zero() {
tokio::time::sleep(delay).await;
}
let Ok(permit) = Arc::clone(&sem).acquire_owned().await else {
break;
};
let tx_c = tx.clone();
let owned_url = url.clone();
let cli = client.clone();
let cfg_task = Arc::clone(&cfg);
tokio::spawn(async move {
let _permit = permit;
let start = Instant::now();
let result =
client::fetch_html_inner(&cli, &owned_url)
.await
.map(|(raw_html, meta)| {
pipeline::convert_to_result(
Some(&owned_url),
&raw_html,
start,
&cfg_task,
&meta,
)
});
let _ = tx_c.send(result).await;
});
}
});
while let Some(result) = rx.recv().await {
on_result(result);
}
let _ = producer.await;
}
#[must_use]
pub fn convert_html(&self, raw_html: &str) -> FetchResult {
let start = Instant::now();
let cfg = self.config();
pipeline::convert_to_result(None, raw_html, start, &cfg, &ResponseMeta::default())
}
fn config(&self) -> ConvertConfig {
ConvertConfig {
options: self.options,
gfm: self.gfm,
extract_links: self.extract_links,
domain: self.domain.clone(),
content: self.content.clone(),
}
}
}