tag2upload_service_manager/
fetcher.rs

1
2use crate::prelude::*;
3
4struct Fetcher {
5    some_forge: &'static dyn ForgeDataVersion,
6    forge_host: Hostname,
7    task_tmpdir: String,
8}
9
10pub struct ValidTagObjectData {
11    pub tag_data: TagObjectData,
12    pub is_recent_enough: IsRecentEnough,
13}
14
15pub enum FetchError {
16    NotForUs(NotForUsReason),
17    Problem(ProcessingError),
18}
19
20impl From<NotForUsReason> for FetchError {
21    fn from(e: NotForUsReason) -> FetchError {
22        FetchError::NotForUs(e)
23    }
24}
25
26impl<E: Into<ProcessingError>> From<E> for FetchError {
27    fn from(e: E) -> FetchError {
28        FetchError::Problem(e.into())
29    }
30}
31
32pub fn record_fetch_outcome(
33    job: JobInWorkflow,
34    tag_data: Result<ValidTagObjectData, FetchError>,
35) -> Result<(), IE> {
36    let host = &job.data.forge_host;
37    let jid = job.jid;
38
39    let outcome = match tag_data {
40        Ok(tag_data) => {
41            debug!(%host, %jid, "fetched tag OK");
42
43            let ValidTagObjectData {
44                tag_data,
45                is_recent_enough: IsRecentEnough { .. },
46            } = tag_data;
47            let tag_data: NoneIsEmpty<TagObjectData> = Some(tag_data).into();
48
49            WfOutcome::Success {
50                status: JobStatus::Queued,
51                info: format!("tag fetched, ready to process"),
52                jset: bsql_update!( JobRow { tag_data } ),
53            }
54        },
55        Err(FetchError::NotForUs(not_for_us)) => {
56            debug!(%host, %jid, "found tag but not for us");
57            WfOutcome::Success {
58                status: JobStatus::NotForUs,
59                info: format!("not for us: {}", not_for_us),
60                jset: JobRowUpdate::default(),
61            }
62        },
63        Err(FetchError::Problem(problem)) => {
64            info!(%host, %jid, "tag fetch failed: {problem:#}");
65            problem.wf_outcome("tag fetch failed")
66        }
67    };
68
69    trace!(host=%job.data.forge_host, %jid, "tag fetch work ended");
70
71    job.outcome(outcome)?;
72
73    Ok(())
74}
75
76impl Fetcher {
77    async fn make_progress(&self)
78        -> Result<(), QuitTask>
79    {
80        self.some_forge.make_progress(
81            &self.forge_host,
82            &self.task_tmpdir,
83        ).await
84    }
85
86    async fn fetch_loop_iteration(
87        &self,
88    ) -> Result<(), QuitTask> {
89        remove_dir_all::ensure_empty_dir(&self.task_tmpdir)
90            .with_context(|| self.task_tmpdir.clone())
91            .into_internal("create/ensure empty dir")?;
92
93        self.make_progress().await
94    }
95
96    pub async fn task(self) -> Result<Void, QuitTask> {
97        loop {
98            self.fetch_loop_iteration().await?;
99        }
100    }
101}
102
103pub fn start_tasks(globals: &Arc<Globals>) {
104    for forge in &globals.config.t2u.forges {
105        for sf in {
106            forge::FORGES.iter().cloned()
107                .filter(|sf| sf.kind_name() == forge.kind)
108        } {
109            for i in 0..forge.max_concurrent_fetch {
110                let task_tmpdir =
111                    format!("{}/fetch,{},{},{i}",
112                            globals.scratch_dir, sf.namever(), forge.host);
113                globals.spawn_task_running(
114                    format!("fetcher [{} {} {i}]",
115                            forge.host, sf.namever()),
116                    Fetcher {
117                        some_forge: sf,
118                        forge_host: forge.host.clone(),
119                        task_tmpdir,
120                    }.task().map(|r| Err(r.void_unwrap_err()))
121                );
122            }
123        }
124    }
125}