Skip to main content

walker_common/scoop/
mod.rs

1mod source;
2
3pub use source::*;
4
5use crate::progress::{Progress, ProgressBar};
6use anyhow::bail;
7use bytes::Bytes;
8use futures_util::{StreamExt, TryStreamExt, stream};
9use tracing::instrument;
10
11/// A tool to build a [`Scooper`].
12#[derive(Default, Debug, PartialEq, Eq, Clone)]
13pub struct ScooperBuilder {
14    pub sources: Vec<Source>,
15    pub delete: bool,
16    pub processed: Option<String>,
17    pub failed: Option<String>,
18}
19
20impl ScooperBuilder {
21    pub async fn build(self) -> anyhow::Result<Scooper> {
22        let sources = self.discover().await?;
23        Ok(Scooper {
24            builder: self,
25            sources,
26        })
27    }
28
29    /// Discover files to upload
30    async fn discover(&self) -> anyhow::Result<Vec<Source>> {
31        Ok(stream::iter(&self.sources)
32            .then(async |source| source.clone().discover().await)
33            .try_collect::<Vec<_>>()
34            .await?
35            .into_iter()
36            .flatten()
37            .collect())
38    }
39}
40
41/// A tool to scoop up files
42pub struct Scooper {
43    builder: ScooperBuilder,
44    sources: Vec<Source>,
45}
46
47impl Scooper {
48    #[instrument(skip_all, err)]
49    pub async fn process<F, P>(self, progress: P, processor: F) -> anyhow::Result<()>
50    where
51        for<'a> F: AsyncFn(&'a str, Bytes) -> anyhow::Result<()> + 'a,
52        P: Progress,
53    {
54        let total = self.sources.len();
55        let mut errors = 0usize;
56
57        let mut p = progress.start(total);
58        for source in self.sources {
59            p.set_message(source.name().to_string()).await;
60            match processor(source.name().as_ref(), source.load().await?).await {
61                Ok(()) => {
62                    if self.builder.delete {
63                        source.delete().await?;
64                    } else if let Some(processed) = &self.builder.processed {
65                        source.r#move(processed).await?;
66                    }
67                }
68                Err(err) => {
69                    errors += 1;
70                    log::error!("Failed to upload document: {err}");
71                    if let Some(failed) = &self.builder.failed {
72                        source.r#move(failed).await?;
73                    }
74                }
75            }
76            p.tick().await;
77        }
78
79        p.finish().await;
80
81        match errors {
82            0 => {
83                log::info!("Uploaded {total} files");
84                Ok(())
85            }
86            n => bail!("Failed to upload {n} (of {total}) files"),
87        }
88    }
89}