tag2upload_service_manager/db_data.rs
1//! Database
2//!
3//! See `JobStatus` for the states of a job,
4//! and progression through the db.
5//!
6//! # Coalescing
7//!
8//! 1. We don't do, simultaneously in two jobs
9//!
10//! - work on the same objectid
11//! - work on the same repo URL
12//!
13//! Implemented in [`JobInWorkflow::start`].
14//!
15//! 2. We don't try fetching for an objectid, if we have other jobs
16//! for the same objectid which do have the tag data,
17//! depending on the other job's state:
18//!
19//! 1. **Noticed / Queued / Building**:
20//! They'll eventually progress to a different state.
21//! This job should wait.
22//! Implemented in [`JobInWorkflow::start_for_forge`].
23//!
24//! 2. **Irrecoverable / Uploaded**:
25//! This job is (presumably) Noticed or Queued.
26//! It will be marked a duplicate of that one,
27//! and takes on its status.
28//! Implemented in [`db_workflow::coalesce_completed`].
29//!
30//! 3. **Failed**:
31//! This job may proceed.
32//!
33//! # Replay / manual retry:
34//!
35//! We look at the timestamp (in the `tagger` line).
36//! We don't accept tags more than SLOP In the future,
37//! or more than MAX_AGE in the past.
38//! We expire things from our database no earlier than MAX_AGE.
39//!
40//! If we get the same tag again (by git object id), it's ignored.
41//! (see above).
42//! There is no facility for manually forcing a retry.
43//! The uploader should use a new version number.
44//!
45//! # Forge up status
46//!
47//! TODO forge up status is not implemented
48//!
49//! We remember for each forge whether we think it's up.
50//!
51//! We start out thinking the forge is down.
52//! When we think the forge is down, we do a preflight
53//! check (https request) to see if it seems to have come up.
54//! We retry that check at increasing intervals.
55//!
56//! Whenever a job fails, we go back to thinking the forge is down.
57
58use crate::prelude::*;
59
60//==================== jobs table ====================
61
62/// Guarantees:
63///
64/// * The configured repository and forge and calling IP address were fine
65/// * The tag name is plausible
66#[derive(Debug, Clone)]
67#[derive(Deftly)]
68#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow, UiMap)]
69pub struct JobData {
70 pub repo_git_url: String,
71 pub tag_objectid: GitObjectId,
72 pub tag_name: String,
73 pub forge_host: Hostname,
74 pub forge_namever: ForgeNamever,
75 #[deftly(ui(skip))]
76 pub forge_data: ForgeData,
77 // We don't use this operationally, but it's very useful for reporting
78 #[deftly(bsql(flatten), ui(flatten))]
79 pub tag_meta: t2umeta::Parsed,
80}
81
82#[derive(Debug, Deftly, Clone)]
83#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow)]
84#[derive_deftly(UiMap, UpdateWorkerReport)]
85pub struct JobRow {
86 #[deftly(bsql(rowid))]
87 pub jid: JobId,
88 #[deftly(bsql(flatten), ui(flatten))]
89 pub data: JobData,
90 pub received: TimeT,
91 pub last_update: TimeT,
92 #[deftly(ui(skip))]
93 pub tag_data: NoneIsEmpty<TagObjectData>,
94 #[deftly(worker_report)]
95 pub status: JobStatus,
96 /// Not `None` iff we are currently actually processing this job somehow
97 pub processing: NoneIsEmpty<ProcessingInfo>,
98 #[deftly(worker_report)]
99 pub info: String,
100 pub duplicate_of: Option<JobId>,
101}
102
103#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
104#[derive(Deftly, strum::EnumIter)]
105#[derive_deftly(FromSqlStrEnum, ToSqlEnum, UiDisplayEnum)]
106#[cfg_attr(test, derive_deftly_adhoc)]
107pub enum JobStatus {
108 /// Webhook request has been received and queued.
109 ///
110 /// The webhook details including the tag name look plausible.
111 /// We're happy it's at the right forge etc.
112 ///
113 /// Next state is Queued` or NotForUs
114 Noticed,
115
116 /// Tag object has been fetched, job is ready for an Oracle worker
117 ///
118 /// `JobRow.tag_data` is nonempty, and
119 /// `JobRow.tag_objectid` has been confirmed locally.
120 Queued,
121
122 /// Job has been given to an Oracle worker for processing.
123 ///
124 /// This state ought to be accompanied by a connection
125 /// from the worker, through which we will get the outcome.
126 /// If it isn't (eg after restart), the job is irrecoverable.
127 Building,
128
129 /// Tag is *not* a (current) instruction to us
130 NotForUs,
131
132 /// Job has failed; other attempts for the same tag may work
133 ///
134 /// This is a possible next state from any of the other states.
135 Failed,
136
137 /// Job has failed; other attempts for the same tag are doomed
138 ///
139 /// This is a possible next state from any of the other states.
140 Irrecoverable,
141
142 /// Job has been completed successfully
143 Uploaded,
144}
145
146#[cfg(test)] pub(crate) use derive_deftly_driver_JobStatus;
147
148//==================== shown_status field ====================
149
150define_derive_deftly! {
151 RecursiveEnumIter:
152
153 impl $ttype {
154 pub fn iter() -> impl Iterator<Item = Self> {
155 chain!( $(
156 ${if v_is_unit {
157 [$vtype]
158 } else {
159 $( $ftype::iter() ).map($vtype)
160 }},
161 ))
162 }
163 }
164}
165
166#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
167#[derive(derive_more::Display, Deftly)]
168#[derive_deftly(RecursiveEnumIter, UiDisplayEnum, FromSqlStrEnum, ToSqlEnum)]
169pub enum ShownJobStatus {
170 #[display("{_0:?}")]
171 JobStatus(JobStatus),
172 Duplicate,
173}
174impl ShownJobStatus {
175 pub fn new(this_status: JobStatus, duplicate_of: Option<JobId>) -> Self {
176 if let Some(_) = duplicate_of {
177 ShownJobStatus::Duplicate
178 } else {
179 ShownJobStatus::JobStatus(this_status)
180 }
181 }
182}
183impl ShownJobStatus {
184 pub fn from_row(row: &JobRow) -> Self {
185 ShownJobStatus::new(row.status, row.duplicate_of)
186 }
187}
188impl Ord for ShownJobStatus {
189 fn cmp(&self, other: &Self) -> cmp::Ordering {
190 use ShownJobStatus as S;
191 use JobStatus as J;
192 let o = |v| match v {
193 S::JobStatus(s @ J::Uploaded) => (30, Some(s)),
194 S::Duplicate => (20, None),
195 S::JobStatus(other) => (10, Some(other)),
196 };
197 o(*self).cmp(&o(*other))
198 }
199}
200impl PartialOrd for ShownJobStatus {
201 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
202 Some(self.cmp(other))
203 }
204}
205
206//==================== last_expiry ====================
207
208#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
209#[derive_deftly(UiMap, FromSqlRow, AsBSqlRow, UpdateSqlRow)]
210pub struct LastExpiryRow {
211 pub run: TimeT,
212 pub threshold: TimeT,
213}
214
215//==================== stats_by_shown_status* tables ====================
216
217#[derive(Deftly, Ord, PartialOrd, Eq, PartialEq)]
218#[derive_deftly(UiMap, FromSqlRow, AsBSqlRow, UpdateSqlRow)]
219pub struct StatsByShownStatusRow {
220 pub shown_status: ShownJobStatus,
221 pub n_jobs: i64,
222}
223
224//==================== pause_insns table ====================
225
226#[derive(Debug, Clone)]
227#[derive(Deftly)]
228#[derive_deftly(FromSqlRow, AsBSqlRow, UpdateSqlRow)]
229pub struct PauseInsn {
230 pub pause_id: PauseId,
231 pub pause_key: String,
232 pub pause_info: String,
233 pub throttle: bool,
234}