tag2upload_service_manager/
fetcher.rs1
2use crate::prelude::*;
3
4struct Fetcher {
5 some_forge: &'static dyn ForgeDataVersion,
6 forge_host: Hostname,
7 task_tmpdir: String,
8}
9
10pub struct ValidTagObjectData {
11 pub tag_data: TagObjectData,
12 pub is_recent_enough: IsRecentEnough,
13}
14
15pub enum FetchError {
16 NotForUs(NotForUsReason),
17 Problem(ProcessingError),
18}
19
20impl From<NotForUsReason> for FetchError {
21 fn from(e: NotForUsReason) -> FetchError {
22 FetchError::NotForUs(e)
23 }
24}
25
26impl<E: Into<ProcessingError>> From<E> for FetchError {
27 fn from(e: E) -> FetchError {
28 FetchError::Problem(e.into())
29 }
30}
31
32pub fn record_fetch_outcome(
33 job: JobInWorkflow,
34 tag_data: Result<ValidTagObjectData, FetchError>,
35) -> Result<(), IE> {
36 let host = &job.data.forge_host;
37 let jid = job.jid;
38
39 let outcome = match tag_data {
40 Ok(tag_data) => {
41 debug!(%host, %jid, "fetched tag OK");
42
43 let ValidTagObjectData {
44 tag_data,
45 is_recent_enough: IsRecentEnough { .. },
46 } = tag_data;
47 let tag_data: NoneIsEmpty<TagObjectData> = Some(tag_data).into();
48
49 WfOutcome::Success {
50 status: JobStatus::Queued,
51 info: format!("tag fetched, ready to process"),
52 jset: bsql_update!( JobRow { tag_data } ),
53 }
54 },
55 Err(FetchError::NotForUs(not_for_us)) => {
56 debug!(%host, %jid, "found tag but not for us");
57 WfOutcome::Success {
58 status: JobStatus::NotForUs,
59 info: format!("not for us: {}", not_for_us),
60 jset: JobRowUpdate::default(),
61 }
62 },
63 Err(FetchError::Problem(problem)) => {
64 info!(%host, %jid, "tag fetch failed: {problem:#}");
65 problem.wf_outcome("tag fetch failed")
66 }
67 };
68
69 trace!(host=%job.data.forge_host, %jid, "tag fetch work ended");
70
71 job.outcome(outcome)?;
72
73 Ok(())
74}
75
76impl Fetcher {
77 async fn make_progress(&self)
78 -> Result<(), QuitTask>
79 {
80 self.some_forge.make_progress(
81 &self.forge_host,
82 &self.task_tmpdir,
83 ).await
84 }
85
86 async fn fetch_loop_iteration(
87 &self,
88 ) -> Result<(), QuitTask> {
89 remove_dir_all::ensure_empty_dir(&self.task_tmpdir)
90 .with_context(|| self.task_tmpdir.clone())
91 .into_internal("create/ensure empty dir")?;
92
93 self.make_progress().await
94 }
95
96 pub async fn task(self) -> Result<Void, QuitTask> {
97 loop {
98 self.fetch_loop_iteration().await?;
99 }
100 }
101}
102
103pub fn start_tasks(globals: &Arc<Globals>) {
104 for forge in &globals.config.t2u.forges {
105 for sf in {
106 forge::FORGES.iter().cloned()
107 .filter(|sf| sf.kind_name() == forge.kind)
108 } {
109 for i in 0..forge.max_concurrent_fetch {
110 let task_tmpdir =
111 format!("{}/fetch,{},{},{i}",
112 globals.scratch_dir, sf.namever(), forge.host);
113 globals.spawn_task_running(
114 format!("fetcher [{} {} {i}]",
115 forge.host, sf.namever()),
116 Fetcher {
117 some_forge: sf,
118 forge_host: forge.host.clone(),
119 task_tmpdir,
120 }.task().map(|r| Err(r.void_unwrap_err()))
121 );
122 }
123 }
124 }
125}