use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use futures_util::stream::{Stream, StreamExt};
use tokio::sync::mpsc;
use crate::tools::clean::{canonicalize_url, clean_urls};
use crate::tools::fetch::fetch_strategy;
use crate::tools::map::map_children;
use crate::types::{fetch_cache_new, Context, CTX, FETCH_CACHE};
pub fn qrawl_discover_children(
urls: Vec<String>,
ctx: Context,
) -> impl Stream<Item = String> + Send + 'static {
let concurrency = ctx.concurrency;
let (tx, rx) = mpsc::channel::<String>(concurrency);
let ctx_arc = Arc::new(ctx);
let cache = fetch_cache_new();
tokio::spawn(async move {
CTX.scope(ctx_arc, async move {
FETCH_CACHE
.scope(cache, async move {
let stream = build_discover_stream(urls, concurrency);
pump(stream, tx).await;
})
.await;
})
.await;
});
futures_util::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
})
}
pub fn qrawl_fetch_stream<S>(
urls: S,
ctx: Context,
) -> impl Stream<Item = (String, String)> + Send + 'static
where
S: Stream<Item = String> + Send + 'static,
{
let concurrency = ctx.concurrency;
let (tx, rx) = mpsc::channel::<(String, String)>(concurrency);
let ctx_arc = Arc::new(ctx);
let cache = fetch_cache_new();
tokio::spawn(async move {
CTX.scope(ctx_arc, async move {
FETCH_CACHE
.scope(cache, async move {
let stream = build_fetch_stream(urls, concurrency);
pump(stream, tx).await;
})
.await;
})
.await;
});
futures_util::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
})
}
pub fn qrawl_children_stream(
urls: Vec<String>,
ctx: Context,
) -> impl Stream<Item = (String, String)> + Send + 'static {
let concurrency = ctx.concurrency;
let (tx, rx) = mpsc::channel::<(String, String)>(concurrency);
let ctx_arc = Arc::new(ctx);
let cache = fetch_cache_new();
tokio::spawn(async move {
CTX.scope(ctx_arc, async move {
FETCH_CACHE
.scope(cache, async move {
let discover = build_discover_stream(urls, concurrency);
let stream = build_fetch_stream(discover, concurrency);
pump(stream, tx).await;
})
.await;
})
.await;
});
futures_util::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
})
}
pub async fn qrawl_children(
urls: Vec<String>,
ctx: Context,
) -> Result<Vec<(String, String)>, String> {
Ok(qrawl_children_stream(urls, ctx).collect().await)
}
fn build_discover_stream(
urls: Vec<String>,
concurrency: usize,
) -> impl Stream<Item = String> + Send + 'static {
let parents = clean_urls(&urls);
let parse_concurrency = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(concurrency)
.min(concurrency);
let seen: Arc<Mutex<HashSet<String>>> = Arc::new(Mutex::new(HashSet::new()));
futures_util::stream::iter(parents)
.map(|url| async move {
fetch_strategy(&url).await.ok().map(|html| (url, html))
})
.buffer_unordered(concurrency)
.filter_map(|opt| async move { opt })
.map(|(parent_url, parent_html)| async move {
map_children(&parent_html, &parent_url).await
})
.buffer_unordered(parse_concurrency)
.flat_map(move |children| {
let mut unique = Vec::with_capacity(children.len());
for c in children {
let canonical = canonicalize_url(&c);
if seen.lock().unwrap().insert(canonical.clone()) {
unique.push(canonical);
}
}
futures_util::stream::iter(unique)
})
}
fn build_fetch_stream<S>(
urls: S,
concurrency: usize,
) -> impl Stream<Item = (String, String)> + Send + 'static
where
S: Stream<Item = String> + Send + 'static,
{
urls.map(|child_url| async move {
fetch_strategy(&child_url).await.ok().map(|html| (child_url, html))
})
.buffer_unordered(concurrency)
.filter_map(|opt| async move { opt })
}
async fn pump<T, S>(stream: S, tx: mpsc::Sender<T>)
where
S: Stream<Item = T>,
T: Send + 'static,
{
let mut s = Box::pin(stream);
while let Some(item) = s.next().await {
if tx.send(item).await.is_err() {
break;
}
}
}
pub async fn qrawl_emails(urls: Vec<String>, ctx: Context) -> Result<Vec<String>, String> {
let result = chain! {
urls, ctx =>
clean_urls ->
fetch_strategy ->
map_children ->
clean_urls ->
fetch_strategy ->
map_page ->
clean_urls ->
fetch_strategy ->
extract_emails ->
clean_emails
}
.await;
Ok(result.into_iter().map(|(_, email)| email).collect())
}