tag2upload_service_manager/
db_data.rs

1//! Database
2//!
3//! See `JobStatus` for the states of a job,
4//! and progression through the db.
5//!
6//! # Coalescing
7//!
8//!  1. We don't do, simultaneously in two jobs
9//!
10//!      - work on the same objectid
11//!      - work on the same repo URL
12//!
13//!     Implemented in [`JobInWorkflow::start`].
14//!
15//!  2. We don't try fetching for an objectid, if we have other jobs
16//!    for the same objectid which do have the tag data,
17//!    depending on the other job's state:
18//!
19//!     1. **Noticed / Queued / Building**:
20//!       They'll eventually progress to a different state.
21//!       This job should wait.
22//!       Implemented in [`JobInWorkflow::start_for_forge`].
23//!
24//!     2. **Irrecoverable / Uploaded**:
25//!       This job is (presumably) Noticed or Queued.
26//!       It will be marked a duplicate of that one,
27//!       and takes on its status.
28//!       Implemented in [`db_workflow::coalesce_completed`].
29//!
30//!     3. **Failed**:
31//!       This job may proceed.
32//!
33//! # Replay / manual retry:
34//!
35//! We look at the timestamp (in the `tagger` line).
36//! We don't accept tags more than SLOP In the future,
37//! or more than MAX_AGE in the past.
38//! We expire things from our database no earlier than MAX_AGE.
39//!
40//! If we get the same tag again (by git object id), it's ignored.
41//! (see above).
42//! There is no facility for manually forcing a retry.
43//! The uploader should use a new version number.
44
45use crate::prelude::*;
46
47//==================== jobs table ====================
48
49/// Guarantees:
50///
51/// * The configured repository and forge and calling IP address were fine
52/// * The tag name is plausible
53#[derive(Debug, Clone)]
54#[derive(Deftly)]
55#[derive_deftly(FromSqlRow, AsBsqlRow, UpdateSqlRow, UiMap)]
56pub struct JobData {
57    pub repo_git_url: String,
58    pub tag_objectid: GitObjectId,
59    pub tag_name: String,
60    pub forge_host: Hostname,
61    pub forge_namever: forge::Namever,
62    #[deftly(ui(skip))]
63    pub forge_data: ForgeData,
64    // We don't use this operationally, but it's very useful for reporting
65    #[deftly(bsql(flatten), ui(flatten))]
66    pub tag_meta: t2umeta::Parsed,
67}
68
69/// `jobs` table row
70#[derive(Debug, Deftly, Clone)]
71#[derive_deftly(FromSqlRow, AsBsqlRow, UpdateSqlRow)]
72#[derive_deftly(UiMap)]
73pub struct JobRow {
74    #[deftly(bsql(rowid))]
75    pub jid: JobId,
76    #[deftly(bsql(flatten), ui(flatten))]
77    pub data: JobData,
78    pub received: TimeT,
79
80    /// Time of the earliest retry
81    ///
82    /// This is calculated by the retry scheduling algorithm
83    /// based on information in `job_history`.
84    /// When it's reached, we recalculate (in case of config changes, etc.).
85    /// So this stored value is allowed to be too early.
86    ///
87    /// `0` if this job has never failed
88    pub retry_earliest: TreatZeroAsNone<TimeT>,
89
90    #[deftly(ui(skip))]
91    pub tag_data: NoneIsEmpty<TagObjectData>,
92    #[deftly(bsql(flatten), ui(flatten))]
93    pub s: JobState,
94}
95
96/// Primary state of a job
97///
98/// Preserved at each state change, for the job history in the web UI.
99/// Doesn't include the tag data.
100#[derive(Debug, Deftly, Clone)]
101#[derive_deftly(FromSqlRow, AsBsqlRow, UpdateSqlRow)]
102#[derive_deftly(UiMap, UpdateWorkerReport)]
103pub struct JobState {
104    #[deftly(worker_report)]
105    pub status: JobStatus,
106    pub last_update: TimeT,
107    /// Not `None` iff we are currently actually processing this job somehow
108    ///
109    /// `"tempfail"` for a temporary failure;
110    /// in the job_history table, exactly once for each such failure.
111    pub processing: NoneIsEmpty<ProcessingInfo>,
112    #[deftly(worker_report)]
113    pub info: String,
114    pub duplicate_of: Option<JobId>,
115
116    /// Number of attempts that "count"
117    ///
118    /// Incremented just before an attempt.
119    ///
120    /// If we are insisting on looking for successes in between failures,
121    /// we may retry more often, but those aren't included.
122    pub retry_salient_count: u32,
123}
124
125#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
126#[derive(Deftly, strum::EnumIter)]
127#[derive_deftly(FromSqlStrEnum, ToSqlEnum, UiDisplayEnum)]
128#[cfg_attr(test, derive_deftly_adhoc)]
129pub enum JobStatus {
130    /// Webhook request has been received and queued.
131    ///
132    /// The webhook details including the tag name look plausible.
133    /// We're happy it's at the right forge etc.
134    ///
135    /// Next state is Queued` or NotForUs
136    Noticed,
137
138    /// Tag object has been fetched, job is ready for an Oracle worker
139    ///
140    /// `JobRow.tag_data` is nonempty, and
141    /// `JobRow.tag_objectid` has been confirmed locally.
142    Queued,
143
144    /// Job has been given to an Oracle worker for processing.
145    ///
146    /// This state ought to be accompanied by a connection
147    /// from the worker, through which we will get the outcome.
148    /// If it isn't (eg after restart), the job is irrecoverable.
149    Building,
150
151    /// Tag is *not* a (current) instruction to us
152    NotForUs,
153
154    /// Job has failed; other attempts for the same tag may work
155    ///
156    /// This is a possible next state from any of the other states.
157    Failed,
158
159    /// Job has failed; other attempts for the same tag are doomed
160    ///
161    /// This is a possible next state from any of the other states.
162    Irrecoverable,
163
164    /// Job has been completed successfully
165    Uploaded,
166}
167
168pub const JOB_PROCESSING_TEMPFAIL: &str = "tempfail";
169
170#[cfg(test)] pub(crate) use derive_deftly_driver_JobStatus;
171
172//==================== jobs_history table ====================
173
174/// `jobs_history` table row
175#[derive(Debug, Deftly, Clone, Deref)]
176#[derive_deftly(FromSqlRow)]
177// maintained by a TRIGGER, so not AsBsqlRow UpdateSqlRow
178pub struct JobHistoryRow {
179    pub histent: HistEntId,
180    pub jid: JobId,
181    #[deftly(bsql(flatten))]
182    #[deref]
183    pub s: JobState,
184}
185
186//==================== shown_status field ====================
187
188define_derive_deftly! {
189    RecursiveEnumIter:
190
191    impl $ttype {
192        pub fn iter() -> impl Iterator<Item = Self> {
193            chain!( $(
194                ${if v_is_unit {
195                    [$vtype]
196                } else {
197                    $( $ftype::iter() ).map($vtype)
198                }},
199            ))
200        }
201    }
202}
203
204#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
205#[derive(derive_more::Display, Deftly)]
206#[derive_deftly(RecursiveEnumIter, UiDisplayEnum, FromSqlStrEnum, ToSqlEnum)]
207pub enum ShownJobStatus {
208    #[display("{_0:?}")]
209    JobStatus(JobStatus),
210    Duplicate,
211}
212impl ShownJobStatus {
213    pub fn new(this_status: JobStatus, duplicate_of: Option<JobId>) -> Self {
214        if let Some(_) = duplicate_of {
215            ShownJobStatus::Duplicate
216        } else {
217            ShownJobStatus::JobStatus(this_status)
218        }
219    }
220}
221impl ShownJobStatus {
222    pub fn from_state(row: &JobState) -> Self {
223        ShownJobStatus::new(row.status, row.duplicate_of)
224    }
225}
226impl Ord for ShownJobStatus {
227    fn cmp(&self, other: &Self) -> cmp::Ordering {
228        use ShownJobStatus as S;
229        use JobStatus as J;
230        let o = |v| match v {
231            S::JobStatus(s @ J::Uploaded) => (30, Some(s)),
232            S::Duplicate                  => (20, None),
233            S::JobStatus(other)           => (10, Some(other)),
234        };
235        o(*self).cmp(&o(*other))
236    }
237}
238impl PartialOrd for ShownJobStatus {
239    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
240        Some(self.cmp(other))
241    }
242}
243
244//==================== last_expiry ====================
245
246#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
247#[derive_deftly(UiMap, FromSqlRow, AsBsqlRow, UpdateSqlRow)]
248pub struct LastExpiryRow {
249    pub run: TimeT,
250    pub threshold: TimeT,
251}
252
253//==================== stats_by_shown_status* tables ====================
254
255#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
256#[derive_deftly(UiMap, FromSqlRow, AsBsqlRow, UpdateSqlRow)]
257pub struct StatsByShownStatusRow {
258    pub shown_status: ShownJobStatus,
259    pub n_jobs: i64,
260}
261
262//==================== pause_insns table ====================
263
264#[derive(Debug, Clone)]
265#[derive(Deftly)]
266#[derive_deftly(FromSqlRow, AsBsqlRow, UpdateSqlRow)]
267pub struct PauseInsn {
268    pub pause_id: PauseId,
269    pub pause_key: String,
270    pub pause_info: String,
271    pub throttle: bool,
272}