tag2upload_service_manager/
retry.rs

1
2use crate::prelude::*;
3
4/// Counts towards the retry count
5///
6/// Ie, there was another job at the same forge that succeeded,
7/// since the last failure of *this* job.
8#[derive(Debug, Clone, Copy)]
9pub struct IsSalient;
10
11#[derive(Debug, Clone, Copy)]
12pub struct IsFinal;
13
14#[derive(Debug)]
15pub enum Instructions {
16    /// Don't do anything until `retry_earliest`
17    Wait {
18        retry_earliest: TimeT,
19    },
20    /// Attempt now
21    Attempt(AttemptInfo),
22}
23
24#[derive(Debug, derive_more::Display)]
25#[display(
26    "[{}{}]",
27    is_salient.map(|IsSalient| "S").unwrap_or_default(),
28    is_final.map(|()| "r").unwrap_or_else(|IsFinal| "F"),
29)]
30pub struct AttemptInfo {
31    pub is_salient: Option<IsSalient>,
32    pub is_final: Result<(), IsFinal>,
33}
34
35#[derive(Debug)]
36pub struct HostState {
37    last_failure: TimeT,
38
39    /// Last success for this host
40    ///
41    /// We don't store this in the db.  That means that we might do
42    /// one additional retry, for each job, for each restart.  That's OK.
43    last_success: Option<HistEntId>,
44}
45
46const DEFAULT_HOST_STATE: HostState = HostState {
47    last_failure: TimeT(0),
48    last_success: None,
49};
50
51#[derive(Default, Debug)]
52pub struct HostStates(HashMap<Hostname, retry::HostState>);
53
54impl Globals {
55    pub fn host_retry_state(&self) -> MutexGuard<'_, HostStates> {
56        self.host_retry_state.lock()
57            .unwrap_or_else(|pe| pe.into_inner())
58    }
59}
60
61impl HostStates {
62    pub fn get(&self, host: &Hostname) -> &HostState {
63        self.0.get(host).unwrap_or(&DEFAULT_HOST_STATE)
64    }
65    pub fn get_mut(&mut self, host: &Hostname) -> &mut HostState {
66        // Ideally we'd clone `host` only if we need to,
67        // but the HashMap API doesn't allow that.
68        self.0.entry(host.clone()).or_insert(DEFAULT_HOST_STATE)
69    }
70}
71
72impl HostState {
73    /// Calculate the job-specific earliest retry time
74    ///
75    /// Implements part of "Retry schedule"
76    /// from TAG2UPLOAD-MANAGER-PROTOCOL.md.
77    /// TODO consider moving that material here.
78    ///
79    /// This is the job-specific part.  The forge-specific retry limit is
80    /// is not reified in the database (and is reset on every t2usm restart).
81    ///
82    /// Run before we make an attempt.
83    pub fn calculate_retry(
84        &self,
85        dbt: &mut DbTransaction,
86        now: TimeT,
87        policy: &config::RetryPolicy,
88        jid: JobId,
89    ) -> Result<retry::Instructions, DbError<IE>> {
90        let earliest_due_to_last_failure_same_host =
91            self.last_failure.0
92            .saturating_add(policy.timeout_initial.as_secs());
93        if now.0 < earliest_due_to_last_failure_same_host {
94            // Recent failure at the forge.  Don't even query the DB.
95            trace!("need to wait for host, {}s",
96                   earliest_due_to_last_failure_same_host - now.0);
97            return Ok(Instructions::Wait {
98                retry_earliest: TimeT(earliest_due_to_last_failure_same_host),
99            })
100        }
101
102        let job_row: JobRow = dbt.bsql_query_1(&bsql!("
103                        SELECT *
104                          FROM jobs
105                         WHERE jid = " jid "
106               "))?;
107
108        // Calculate number of attempts, which tells us the retry interval
109        let (retry_count,) : (u32,) = dbt.bsql_query_1(&bsql!("
110                        SELECT count(*)
111                          FROM job_history
112                         WHERE jid = " jid "
113                           AND processing = " JOB_PROCESSING_TEMPFAIL "
114               "))?;
115
116        // If we haven't had a temporary failure yet, try now (and expect a retry)
117        if retry_count == 0 {
118            trace!("no attempts yet, run right away");
119            return Ok(Instructions::Attempt(AttemptInfo {
120                is_salient: Some(IsSalient),
121                is_final: Ok(()),
122            }));
123        }
124
125        let interval =
126            policy.timeout_initial.as_secs() as f32 *
127            policy.timeout_increase
128            .powi(retry_count.try_into().unwrap_or(i32::MAX));
129        let interval: u64 = cmp::min(
130            interval as u64,
131            policy.timeout_mintotal.as_secs(),
132        );
133
134        // Calculate the retry time based on the previous attempt
135
136        let mut get_attempt = |asc_desc: &dyn Bsql|
137            -> Result<JobHistoryRow, _>
138        {
139            dbt.bsql_query_1(&bsql!("
140                        SELECT *
141                          FROM job_history
142                         WHERE jid = " jid "
143                           AND processing = 'tempfail'
144                      ORDER BY histent " {asc_desc} "
145                         LIMIT 1
146            "))
147        };
148
149        let first_attempt = get_attempt(&bsql!("ASC"))?;
150        let previous_attempt = get_attempt(&bsql!("DESC"))?;
151
152        trace!(?first_attempt, ?previous_attempt,
153               "job-specific calculation");
154
155        let retry_earliest = previous_attempt.last_update + interval;
156        if now >= retry_earliest {
157            return Ok(Instructions::Wait { retry_earliest })
158        }
159
160        // It's time to retry.  But, is it salient and/or final?
161
162        let is_salient = (|| {
163            if previous_attempt.histent >= self.last_success? {
164                return None
165            }
166            Some(IsSalient)
167        })();
168
169        let is_final = if (
170            {
171                retry_count + 1 >= policy.min_retries 
172            } && {
173                let salient_count =
174                    job_row.s.retry_salient_count +
175                    (is_salient.is_some() as u32);
176                salient_count >= policy.min_salient_retries
177            } && {
178                now >=
179                    first_attempt.last_update
180                    + policy.timeout_mintotal.as_secs()
181            }
182        ) {
183            Err(IsFinal)
184        } else {
185            Ok(())
186        };
187
188        Ok(Instructions::Attempt(AttemptInfo { is_salient, is_final }))
189    }
190
191    pub fn note_success(&mut self, histent: HistEntId) {
192        self.last_success.track_maximum_seen(Some(histent))
193    }
194
195    pub fn note_retriable_failure(&mut self, now: TimeT) {
196        self.last_failure.track_maximum_seen(now);
197    }
198}