tag2upload_service_manager/
retry.rs1
2use crate::prelude::*;
3
4#[derive(Debug, Clone, Copy)]
9pub struct IsSalient;
10
11#[derive(Debug, Clone, Copy)]
12pub struct IsFinal;
13
14#[derive(Debug)]
15pub enum Instructions {
16 Wait {
18 retry_earliest: TimeT,
19 },
20 Attempt(AttemptInfo),
22}
23
24#[derive(Debug, derive_more::Display)]
25#[display(
26 "[{}{}]",
27 is_salient.map(|IsSalient| "S").unwrap_or_default(),
28 is_final.map(|()| "r").unwrap_or_else(|IsFinal| "F"),
29)]
30pub struct AttemptInfo {
31 pub is_salient: Option<IsSalient>,
32 pub is_final: Result<(), IsFinal>,
33}
34
35#[derive(Debug)]
36pub struct HostState {
37 last_failure: TimeT,
38
39 last_success: Option<HistEntId>,
44}
45
46const DEFAULT_HOST_STATE: HostState = HostState {
47 last_failure: TimeT(0),
48 last_success: None,
49};
50
51#[derive(Default, Debug)]
52pub struct HostStates(HashMap<Hostname, retry::HostState>);
53
54impl Globals {
55 pub fn host_retry_state(&self) -> MutexGuard<'_, HostStates> {
56 self.host_retry_state.lock()
57 .unwrap_or_else(|pe| pe.into_inner())
58 }
59}
60
61impl HostStates {
62 pub fn get(&self, host: &Hostname) -> &HostState {
63 self.0.get(host).unwrap_or(&DEFAULT_HOST_STATE)
64 }
65 pub fn get_mut(&mut self, host: &Hostname) -> &mut HostState {
66 self.0.entry(host.clone()).or_insert(DEFAULT_HOST_STATE)
69 }
70}
71
72impl HostState {
73 pub fn calculate_retry(
84 &self,
85 dbt: &mut DbTransaction,
86 now: TimeT,
87 policy: &config::RetryPolicy,
88 jid: JobId,
89 ) -> Result<retry::Instructions, DbError<IE>> {
90 let earliest_due_to_last_failure_same_host =
91 self.last_failure.0
92 .saturating_add(policy.timeout_initial.as_secs());
93 if now.0 < earliest_due_to_last_failure_same_host {
94 trace!("need to wait for host, {}s",
96 earliest_due_to_last_failure_same_host - now.0);
97 return Ok(Instructions::Wait {
98 retry_earliest: TimeT(earliest_due_to_last_failure_same_host),
99 })
100 }
101
102 let job_row: JobRow = dbt.bsql_query_1(&bsql!("
103 SELECT *
104 FROM jobs
105 WHERE jid = " jid "
106 "))?;
107
108 let (retry_count,) : (u32,) = dbt.bsql_query_1(&bsql!("
110 SELECT count(*)
111 FROM job_history
112 WHERE jid = " jid "
113 AND processing = " JOB_PROCESSING_TEMPFAIL "
114 "))?;
115
116 if retry_count == 0 {
118 trace!("no attempts yet, run right away");
119 return Ok(Instructions::Attempt(AttemptInfo {
120 is_salient: Some(IsSalient),
121 is_final: Ok(()),
122 }));
123 }
124
125 let interval =
126 policy.timeout_initial.as_secs() as f32 *
127 policy.timeout_increase
128 .powi(retry_count.try_into().unwrap_or(i32::MAX));
129 let interval: u64 = cmp::min(
130 interval as u64,
131 policy.timeout_mintotal.as_secs(),
132 );
133
134 let mut get_attempt = |asc_desc: &dyn Bsql|
137 -> Result<JobHistoryRow, _>
138 {
139 dbt.bsql_query_1(&bsql!("
140 SELECT *
141 FROM job_history
142 WHERE jid = " jid "
143 AND processing = 'tempfail'
144 ORDER BY histent " {asc_desc} "
145 LIMIT 1
146 "))
147 };
148
149 let first_attempt = get_attempt(&bsql!("ASC"))?;
150 let previous_attempt = get_attempt(&bsql!("DESC"))?;
151
152 trace!(?first_attempt, ?previous_attempt,
153 "job-specific calculation");
154
155 let retry_earliest = previous_attempt.last_update + interval;
156 if now >= retry_earliest {
157 return Ok(Instructions::Wait { retry_earliest })
158 }
159
160 let is_salient = (|| {
163 if previous_attempt.histent >= self.last_success? {
164 return None
165 }
166 Some(IsSalient)
167 })();
168
169 let is_final = if (
170 {
171 retry_count + 1 >= policy.min_retries
172 } && {
173 let salient_count =
174 job_row.s.retry_salient_count +
175 (is_salient.is_some() as u32);
176 salient_count >= policy.min_salient_retries
177 } && {
178 now >=
179 first_attempt.last_update
180 + policy.timeout_mintotal.as_secs()
181 }
182 ) {
183 Err(IsFinal)
184 } else {
185 Ok(())
186 };
187
188 Ok(Instructions::Attempt(AttemptInfo { is_salient, is_final }))
189 }
190
191 pub fn note_success(&mut self, histent: HistEntId) {
192 self.last_success.track_maximum_seen(Some(histent))
193 }
194
195 pub fn note_retriable_failure(&mut self, now: TimeT) {
196 self.last_failure.track_maximum_seen(now);
197 }
198}