use async_trait::async_trait;
use exn::{Exn, OptionExt, ResultExt};
use futures_core::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use reqwest::Client;
use crate::{
crawl,
crawler::{CrawlerError, ProgressManager},
error::ErrorStatus,
filter::FileFilter,
Dataset, Entry,
};
use bytes::Buf;
use digest::Digest;
use std::{fs, path::Path};
use tokio::{fs::OpenOptions, io::AsyncWriteExt};
use tracing::{debug, instrument, warn};
use crate::{Checksum, Hasher};
impl Dataset {
pub async fn print_meta(
&self,
client: &Client,
mp: MultiProgress,
limit: usize,
filter: Option<&FileFilter>,
) -> Result<usize, Exn<CrawlerError>> {
let root_dir = self.root_dir();
let file_count = Arc::new(AtomicUsize::new(0));
let counter = Arc::clone(&file_count);
crawl(client.clone(), Arc::clone(&self.backend), root_dir, mp)
.try_filter(move |entry| {
let pass = match entry {
Entry::Dir(_) => true,
Entry::File(file_meta) => match filter {
Some(filter) => filter.matches(file_meta.relative().as_str()),
None => true,
},
};
futures_util::future::ready(pass)
})
.try_for_each_concurrent(limit, |entry| {
let counter = Arc::clone(&counter);
async move {
match entry {
Entry::Dir(dir_meta) => {
println!("{dir_meta}");
}
Entry::File(file_meta) => {
counter.fetch_add(1, Ordering::Relaxed);
println!("{file_meta}");
}
}
Ok(())
}
})
.await
.or_raise(|| CrawlerError {
message: "crawl, download and validation failed".to_string(),
status: ErrorStatus::Permanent,
})?;
Ok(file_count.load(Ordering::Relaxed))
}
}
#[allow(clippy::too_many_lines)]
#[instrument(skip(client, mp))]
async fn download_crawled_file_with_validation<P>(
client: &Client,
src: Entry,
dst: P,
mp: impl ProgressManager,
) -> Result<(), Exn<CrawlerError>>
where
P: AsRef<Path> + std::fmt::Debug,
{
debug!("downloading with validating");
match src {
Entry::Dir(dir_meta) => {
let path = dst.as_ref().join(dir_meta.relative());
fs::create_dir_all(path.as_path()).or_raise(|| CrawlerError {
message: format!("cannot create dir {}", path.display()),
status: ErrorStatus::Permanent,
})?;
Ok(())
}
Entry::File(file_meta) => {
let pb = mp.insert(0, ProgressBar::new_spinner());
pb.set_style(
ProgressStyle::with_template("{spinner:.green} {msg}")
.expect("indicatif template error"),
);
pb.enable_steady_tick(std::time::Duration::from_millis(100));
pb.set_message(format!(
"Connecting... {}",
file_meta.download_url().as_str()
));
if !file_meta.is_downloadable() {
pb.set_message(format!(
"{} is not downloadable",
file_meta.download_url().as_str()
));
return Ok(());
}
let resp = client
.get(file_meta.download_url())
.send()
.await
.or_raise(|| CrawlerError {
message: format!("fail to send http GET to {}", file_meta.download_url()),
status: ErrorStatus::Temporary,
})?
.error_for_status()
.or_raise(|| CrawlerError {
message: format!("fail to send http GET to {}", file_meta.download_url()),
status: ErrorStatus::Temporary,
})?;
pb.finish_and_clear();
let mut stream = resp.bytes_stream();
let path = dst.as_ref().join(file_meta.relative());
let parent_dir = path.parent().ok_or_raise(|| CrawlerError {
message: format!("connot get parent dir for '{}'", path.display()),
status: ErrorStatus::Permanent,
})?;
fs::create_dir_all(parent_dir).or_raise(|| CrawlerError {
message: format!("connot create folder dir of '{}'", parent_dir.display()),
status: ErrorStatus::Permanent,
})?;
let mut fh = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(path.as_path())
.await
.or_raise(|| CrawlerError {
message: format!("fail on create file at {}", path.display()),
status: ErrorStatus::Permanent,
})?;
let checksum = file_meta
.checksum()
.iter()
.find(|c| matches!(c, Checksum::Sha256(_)))
.or_else(|| file_meta.checksum().first());
let expected_size = file_meta.size();
let (mut hasher, expected_checksum) = if let Some(checksum) = checksum {
match checksum {
Checksum::Sha256(value) => {
(Some(Hasher::Sha256(sha2::Sha256::new())), Some(value))
}
Checksum::Md5(value) => (Some(Hasher::Md5(md5::Md5::new())), Some(value)),
Checksum::Sha1(value) => (Some(Hasher::Sha1(sha1::Sha1::new())), Some(value)),
}
} else {
warn!("unable to find expected checksum to verify");
(None, None)
};
let style = ProgressStyle::with_template(
"{msg:<60} [{bar:40.cyan/blue}] \
{decimal_bytes:>8}/{decimal_total_bytes:>8} \
({decimal_bytes_per_sec:>12}, {eta:>3})",
)
.unwrap()
.progress_chars("=>-");
let pb = if let Some(expected_size) = expected_size {
mp.insert_from_back(0, ProgressBar::new(expected_size))
} else {
mp.insert_from_back(0, ProgressBar::no_length())
};
pb.set_style(style);
pb.enable_steady_tick(std::time::Duration::from_millis(100));
pb.set_message(compact_path(file_meta.relative().as_str()));
let mut got_size = 0;
while let Some(item) = stream.next().await {
let mut bytes = item.or_raise(|| CrawlerError {
message: "reqwest error stream".to_string(),
status: ErrorStatus::Permanent,
})?;
let chunk = bytes.chunk();
if let Some(ref mut hasher) = hasher {
hasher.update(chunk);
}
let bytes_len = bytes.len() as u64;
got_size += bytes_len;
fh.write_all_buf(&mut bytes)
.await
.or_raise(|| CrawlerError {
message: "fail at writing to fs".to_string(),
status: ErrorStatus::Permanent,
})?;
pb.inc(bytes_len);
}
pb.finish_and_clear();
if let (Some(expected_size), Some(expected_checksum)) =
(expected_size, expected_checksum)
{
if got_size != expected_size {
exn::bail!(CrawlerError {
message: format!("size wrong, expect {expected_size}, got {got_size}"),
status: ErrorStatus::Permanent
})
}
let checksum = hex::encode(hasher.expect("hasher is not none").finalize());
if checksum != *expected_checksum {
exn::bail!(CrawlerError {
message: format!(
"checksum wrong, expect {expected_checksum}, got {checksum}"
),
status: ErrorStatus::Permanent
})
}
}
Ok(())
}
}
}
fn compact_path(full_path: &str) -> String {
let path = Path::new(full_path);
let mut comps: Vec<String> = path
.parent() .map(|p| {
p.components()
.map(|c| {
let s = c.as_os_str().to_string_lossy();
if s.is_empty() {
String::new()
} else {
s.chars().next().unwrap().to_string()
}
})
.collect()
})
.unwrap_or_default();
if let Some(file_name) = path.file_name() {
comps.push(file_name.to_string_lossy().to_string());
}
comps.join("/")
}
#[async_trait]
pub trait DownloadExt {
async fn download_with_validation<P>(
self,
client: &Client,
dst_dir: P,
mp: impl ProgressManager,
limit: usize,
filter: Option<&FileFilter>,
) -> Result<usize, Exn<CrawlerError>>
where
P: AsRef<Path> + Sync + Send;
}
#[async_trait]
impl DownloadExt for Dataset {
async fn download_with_validation<P>(
self,
client: &Client,
dst_dir: P,
mp: impl ProgressManager,
limit: usize,
filter: Option<&FileFilter>,
) -> Result<usize, Exn<CrawlerError>>
where
P: AsRef<Path> + Sync + Send,
{
let root_dir = self.root_dir();
let path = dst_dir.as_ref().join(root_dir.relative());
fs::create_dir_all(path.as_path()).or_raise(|| CrawlerError {
message: format!("cannot create dir at '{}'", path.display()),
status: ErrorStatus::Permanent,
})?;
let file_count = Arc::new(AtomicUsize::new(0));
let counter = Arc::clone(&file_count);
crawl(
client.clone(),
Arc::clone(&self.backend),
root_dir,
mp.clone(),
)
.try_filter(move |entry| {
let pass = match entry {
Entry::Dir(_) => true,
Entry::File(file_meta) => match &filter {
Some(filter) => filter.matches(file_meta.relative().as_str()),
None => true,
},
};
futures_util::future::ready(pass)
})
.try_for_each_concurrent(limit, |entry| {
let dst_dir = dst_dir.as_ref().to_path_buf();
let mp = mp.clone();
let counter = Arc::clone(&counter);
async move {
if matches!(&entry, Entry::File(_)) {
counter.fetch_add(1, Ordering::Relaxed);
}
download_crawled_file_with_validation(client, entry, &dst_dir, mp).await?;
Ok(())
}
})
.await
.or_raise(|| CrawlerError {
message: "crawl, download and validation failed".to_string(),
status: ErrorStatus::Permanent,
})?;
Ok(file_count.load(Ordering::Relaxed))
}
}
pub trait CrawlExt {
fn crawl(
self,
client: &Client,
mp: impl ProgressManager,
) -> BoxStream<'static, Result<Entry, Exn<CrawlerError>>>;
}
impl CrawlExt for Dataset {
fn crawl(
self,
client: &Client,
mp: impl ProgressManager,
) -> BoxStream<'static, Result<Entry, Exn<CrawlerError>>> {
let root_dir = self.root_dir();
crawl(
client.clone(),
Arc::clone(&self.backend),
root_dir,
mp.clone(),
)
}
}