walker_common/scoop/
mod.rs1mod source;
2
3pub use source::*;
4
5use crate::progress::{Progress, ProgressBar};
6use anyhow::bail;
7use bytes::Bytes;
8use futures_util::{StreamExt, TryStreamExt, stream};
9use tracing::instrument;
10
11#[derive(Default, Debug, PartialEq, Eq, Clone)]
13pub struct ScooperBuilder {
14 pub sources: Vec<Source>,
15 pub delete: bool,
16 pub processed: Option<String>,
17 pub failed: Option<String>,
18}
19
20impl ScooperBuilder {
21 pub async fn build(self) -> anyhow::Result<Scooper> {
22 let sources = self.discover().await?;
23 Ok(Scooper {
24 builder: self,
25 sources,
26 })
27 }
28
29 async fn discover(&self) -> anyhow::Result<Vec<Source>> {
31 Ok(stream::iter(&self.sources)
32 .then(async |source| source.clone().discover().await)
33 .try_collect::<Vec<_>>()
34 .await?
35 .into_iter()
36 .flatten()
37 .collect())
38 }
39}
40
41pub struct Scooper {
43 builder: ScooperBuilder,
44 sources: Vec<Source>,
45}
46
47impl Scooper {
48 #[instrument(skip_all, err)]
49 pub async fn process<F, P>(self, progress: P, processor: F) -> anyhow::Result<()>
50 where
51 for<'a> F: AsyncFn(&'a str, Bytes) -> anyhow::Result<()> + 'a,
52 P: Progress,
53 {
54 let total = self.sources.len();
55 let mut errors = 0usize;
56
57 let mut p = progress.start(total);
58 for source in self.sources {
59 p.set_message(source.name().to_string()).await;
60 match processor(source.name().as_ref(), source.load().await?).await {
61 Ok(()) => {
62 if self.builder.delete {
63 source.delete().await?;
64 } else if let Some(processed) = &self.builder.processed {
65 source.r#move(processed).await?;
66 }
67 }
68 Err(err) => {
69 errors += 1;
70 log::error!("Failed to upload document: {err}");
71 if let Some(failed) = &self.builder.failed {
72 source.r#move(failed).await?;
73 }
74 }
75 }
76 p.tick().await;
77 }
78
79 p.finish().await;
80
81 match errors {
82 0 => {
83 log::info!("Uploaded {total} files");
84 Ok(())
85 }
86 n => bail!("Failed to upload {n} (of {total}) files"),
87 }
88 }
89}