use crate::progress::Progress;
use anyhow::bail;
use std::{
future::Future,
path::{Path, PathBuf},
pin::Pin,
};
use tracing::instrument;
#[derive(Default, Debug, PartialEq, Eq, Clone)]
pub struct ScooperBuilder {
pub sources: Vec<PathBuf>,
pub delete: bool,
pub processed: Option<PathBuf>,
pub failed: Option<PathBuf>,
}
impl ScooperBuilder {
pub fn build(self) -> anyhow::Result<Scooper> {
let files = self.discover()?;
Ok(Scooper {
builder: self,
files,
})
}
fn discover(&self) -> anyhow::Result<Vec<PathBuf>> {
Ok(self
.sources
.iter()
.map(|path| Self::discover_one(path))
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.flatten()
.collect())
}
fn discover_one(path: &Path) -> anyhow::Result<Vec<PathBuf>> {
log::debug!("Discovering: {}", path.display());
if !path.exists() {
bail!("{} does not exist", path.display());
} else if path.is_file() {
log::debug!("Is a file");
Ok(vec![path.to_path_buf()])
} else if path.is_dir() {
log::debug!("Is a directory");
let mut result = Vec::new();
for path in walkdir::WalkDir::new(path).into_iter() {
let path = path?;
if path.file_type().is_file() {
result.push(path.path().to_path_buf());
}
}
Ok(result)
} else {
log::warn!("Is something unknown: {}", path.display());
Ok(vec![])
}
}
}
pub struct Scooper {
builder: ScooperBuilder,
files: Vec<PathBuf>,
}
impl Scooper {
#[instrument(skip_all, err)]
pub async fn process<F>(self, progress: impl Into<Progress>, processor: F) -> anyhow::Result<()>
where
F: for<'a> Fn(&'a Path) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'a>>,
{
if let Some(processed) = &self.builder.processed {
tokio::fs::create_dir_all(processed).await?;
}
if let Some(failed) = &self.builder.failed {
tokio::fs::create_dir_all(failed).await?;
}
let total = self.files.len();
let mut errors = 0usize;
let progress = progress.into();
let p = progress.start(total);
for file in self.files {
p.set_message(
file.file_name()
.map(|s| s.to_string_lossy())
.unwrap_or_else(|| file.to_string_lossy())
.to_string()
.into(),
);
match processor(&file).await {
Ok(()) => {
if self.builder.delete {
tokio::fs::remove_file(&file).await?;
} else if let Some(processed) = &self.builder.processed {
tokio::fs::copy(&file, processed.join(&file)).await?;
tokio::fs::remove_file(&file).await?;
}
}
Err(err) => {
errors += 1;
log::error!("Failed to upload file: {err}");
if let Some(failed) = &self.builder.failed {
tokio::fs::copy(&file, failed.join(&file)).await?;
tokio::fs::remove_file(&file).await?;
}
}
}
p.tick();
}
drop(p);
match errors {
0 => {
log::info!("Uploaded {total} files");
Ok(())
}
n => bail!("Failed to upload {n} (of {total}) files"),
}
}
}