tag2upload_service_manager/
db_data.rs

1//! Database
2//!
3//! See `JobStatus` for the states of a job,
4//! and progression through the db.
5//!
6//! # Coalescing
7//!
8//!  1. We don't do, simultaneously in two jobs
9//!
10//!      - work on the same objectid
11//!      - work on the same repo URL
12//!
13//!     Implemented in [`JobInWorkflow::start`].
14//!
15//!  2. We don't try fetching for an objectid, if we have other jobs
16//!    for the same objectid which do have the tag data,
17//!    depending on the other job's state:
18//!
19//!     1. **Noticed / Queued / Building**:
20//!       They'll eventually progress to a different state.
21//!       This job should wait.
22//!       Implemented in [`JobInWorkflow::start_for_forge`].
23//!
24//!     2. **Irrecoverable / Uploaded**:
25//!       This job is (presumably) Noticed or Queued.
26//!       It will be marked a duplicate of that one,
27//!       and takes on its status.
28//!       Implemented in [`db_workflow::coalesce_completed`].
29//!
30//!     3. **Failed**:
31//!       This job may proceed.
32//!
33//! # Replay / manual retry:
34//!
35//! We look at the timestamp (in the `tagger` line).
36//! We don't accept tags more than SLOP In the future,
37//! or more than MAX_AGE in the past.
38//! We expire things from our database no earlier than MAX_AGE.
39//!
40//! If we get the same tag again (by git object id), it's ignored.
41//! (see above).
42//! There is no facility for manually forcing a retry.
43//! The uploader should use a new version number.
44//!
45//! # Forge up status
46//!
47//! TODO forge up status is not implemented
48//!
49//! We remember for each forge whether we think it's up.
50//!
51//! We start out thinking the forge is down.
52//! When we think the forge is down, we do a preflight
53//! check (https request) to see if it seems to have come up.
54//! We retry that check at increasing intervals.
55//!
56//! Whenever a job fails, we go back to thinking the forge is down.
57
58use crate::prelude::*;
59
60//==================== jobs table ====================
61
62/// Guarantees:
63///
64/// * The configured repository and forge and calling IP address were fine
65/// * The tag name is plausible
66#[derive(Debug, Clone)]
67#[derive(Deftly)]
68#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow, UiMap)]
69pub struct JobData {
70    pub repo_git_url: String,
71    pub tag_objectid: GitObjectId,
72    pub tag_name: String,
73    pub forge_host: Hostname,
74    pub forge_namever: ForgeNamever,
75    #[deftly(ui(skip))]
76    pub forge_data: ForgeData,
77    // We don't use this operationally, but it's very useful for reporting
78    #[deftly(bsql(flatten), ui(flatten))]
79    pub tag_meta: t2umeta::Parsed,
80}
81
82#[derive(Debug, Deftly, Clone)]
83#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow)]
84#[derive_deftly(UiMap, UpdateWorkerReport)]
85pub struct JobRow {
86    #[deftly(bsql(rowid))]
87    pub jid: JobId,
88    #[deftly(bsql(flatten), ui(flatten))]
89    pub data: JobData,
90    pub received: TimeT,
91    pub last_update: TimeT,
92    #[deftly(ui(skip))]
93    pub tag_data: NoneIsEmpty<TagObjectData>,
94    #[deftly(worker_report)]
95    pub status: JobStatus,
96    /// Not `None` iff we are currently actually processing this job somehow
97    pub processing: NoneIsEmpty<ProcessingInfo>,
98    #[deftly(worker_report)]
99    pub info: String,
100    pub duplicate_of: Option<JobId>,
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
104#[derive(Deftly, strum::EnumIter)]
105#[derive_deftly(FromSqlStrEnum, ToSqlEnum, UiDisplayEnum)]
106#[cfg_attr(test, derive_deftly_adhoc)]
107pub enum JobStatus {
108    /// Webhook request has been received and queued.
109    ///
110    /// The webhook details including the tag name look plausible.
111    /// We're happy it's at the right forge etc.
112    ///
113    /// Next state is Queued` or NotForUs
114    Noticed,
115
116    /// Tag object has been fetched, job is ready for an Oracle worker
117    ///
118    /// `JobRow.tag_data` is nonempty, and
119    /// `JobRow.tag_objectid` has been confirmed locally.
120    Queued,
121
122    /// Job has been given to an Oracle worker for processing.
123    ///
124    /// This state ought to be accompanied by a connection
125    /// from the worker, through which we will get the outcome.
126    /// If it isn't (eg after restart), the job is irrecoverable.
127    Building,
128
129    /// Tag is *not* a (current) instruction to us
130    NotForUs,
131
132    /// Job has failed; other attempts for the same tag may work
133    ///
134    /// This is a possible next state from any of the other states.
135    Failed,
136
137    /// Job has failed; other attempts for the same tag are doomed
138    ///
139    /// This is a possible next state from any of the other states.
140    Irrecoverable,
141
142    /// Job has been completed successfully
143    Uploaded,
144}
145
146#[cfg(test)] pub(crate) use derive_deftly_driver_JobStatus;
147
148//==================== shown_status field ====================
149
150define_derive_deftly! {
151    RecursiveEnumIter:
152
153    impl $ttype {
154        pub fn iter() -> impl Iterator<Item = Self> {
155            chain!( $(
156                ${if v_is_unit {
157                    [$vtype]
158                } else {
159                    $( $ftype::iter() ).map($vtype)
160                }},
161            ))
162        }
163    }
164}
165
166#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
167#[derive(derive_more::Display, Deftly)]
168#[derive_deftly(RecursiveEnumIter, UiDisplayEnum, FromSqlStrEnum, ToSqlEnum)]
169pub enum ShownJobStatus {
170    #[display("{_0:?}")]
171    JobStatus(JobStatus),
172    Duplicate,
173}
174impl ShownJobStatus {
175    pub fn new(this_status: JobStatus, duplicate_of: Option<JobId>) -> Self {
176        if let Some(_) = duplicate_of {
177            ShownJobStatus::Duplicate
178        } else {
179            ShownJobStatus::JobStatus(this_status)
180        }
181    }
182}
183impl ShownJobStatus {
184    pub fn from_row(row: &JobRow) -> Self {
185        ShownJobStatus::new(row.status, row.duplicate_of)
186    }
187}
188impl Ord for ShownJobStatus {
189    fn cmp(&self, other: &Self) -> cmp::Ordering {
190        use ShownJobStatus as S;
191        use JobStatus as J;
192        let o = |v| match v {
193            S::JobStatus(s @ J::Uploaded) => (30, Some(s)),
194            S::Duplicate                  => (20, None),
195            S::JobStatus(other)           => (10, Some(other)),
196        };
197        o(*self).cmp(&o(*other))
198    }
199}
200impl PartialOrd for ShownJobStatus {
201    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
202        Some(self.cmp(other))
203    }
204}
205
206//==================== last_expiry ====================
207
208#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
209#[derive_deftly(UiMap, FromSqlRow, AsBSqlRow, UpdateSqlRow)]
210pub struct LastExpiryRow {
211    pub run: TimeT,
212    pub threshold: TimeT,
213}
214
215//==================== stats_by_shown_status* tables ====================
216
217#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
218#[derive_deftly(UiMap, FromSqlRow, AsBSqlRow, UpdateSqlRow)]
219pub struct StatsByShownStatusRow {
220    pub shown_status: ShownJobStatus,
221    pub n_jobs: i64,
222}
223
224//==================== pause_insns table ====================
225
226#[derive(Debug, Clone)]
227#[derive(Deftly)]
228#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow)]
229pub struct PauseInsn {
230    pub pause_id: PauseId,
231    pub pause_key: String,
232    pub pause_info: String,
233    pub throttle: bool,
234}