1use crate::{
2 error::{Error, UploadError},
3 formats::InputProcessableFormat,
4 future::{LocalBoxFuture, WithPollTimer},
5 repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
6 serde_str::Serde,
7 state::State,
8 store::Store,
9 UploadQuery,
10};
11
12use std::{
13 ops::Deref,
14 rc::Rc,
15 sync::Arc,
16 time::{Duration, Instant},
17};
18use tokio::task::JoinError;
19use tracing::Instrument;
20
21pub(crate) mod cleanup;
22mod process;
23
24const CLEANUP_QUEUE: &str = "cleanup";
25const PROCESS_QUEUE: &str = "process";
26const OUTDATED_PROXIES_UNIQUE_KEY: &str = "outdated-proxies";
27const OUTDATED_VARIANTS_UNIQUE_KEY: &str = "outdated-variants";
28const ALL_VARIANTS_UNIQUE_KEY: &str = "all-variants";
29const PRUNE_MISSING_UNIQUE_KEY: &str = "prune-missing";
30
31#[derive(Debug, serde::Deserialize, serde::Serialize)]
32enum Cleanup {
33 Hash {
34 hash: Hash,
35 },
36 Identifier {
37 identifier: String,
38 },
39 Alias {
40 alias: Serde<Alias>,
41 token: Serde<DeleteToken>,
42 },
43 Variant {
44 hash: Hash,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 variant: Option<String>,
47 },
48 AllVariants,
49 OutdatedVariants,
50 OutdatedProxies,
51 Prune,
52}
53
54#[derive(Debug, serde::Deserialize, serde::Serialize)]
55enum Process {
56 Ingest {
57 identifier: String,
58 upload_id: Serde<UploadId>,
59 declared_alias: Option<Serde<Alias>>,
60 #[serde(default)]
61 upload_query: UploadQuery,
62 },
63 Generate {
64 target_format: InputProcessableFormat,
65 source: Serde<Alias>,
66 process_path: String,
67 process_args: Vec<String>,
68 },
69}
70
71pub(crate) async fn cleanup_alias(
72 repo: &ArcRepo,
73 alias: Alias,
74 token: DeleteToken,
75) -> Result<(), Error> {
76 let job = serde_json::to_value(Cleanup::Alias {
77 alias: Serde::new(alias),
78 token: Serde::new(token),
79 })
80 .map_err(UploadError::PushJob)?;
81 repo.push(CLEANUP_QUEUE, job, None).await?;
82 Ok(())
83}
84
85pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
86 let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
87 repo.push(CLEANUP_QUEUE, job, None).await?;
88 Ok(())
89}
90
91pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) -> Result<(), Error> {
92 let job = serde_json::to_value(Cleanup::Identifier {
93 identifier: identifier.to_string(),
94 })
95 .map_err(UploadError::PushJob)?;
96 repo.push(CLEANUP_QUEUE, job, None).await?;
97 Ok(())
98}
99
100async fn cleanup_variants(
101 repo: &ArcRepo,
102 hash: Hash,
103 variant: Option<String>,
104) -> Result<(), Error> {
105 let job =
106 serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
107 repo.push(CLEANUP_QUEUE, job, None).await?;
108 Ok(())
109}
110
111pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> {
112 let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
113 if repo
114 .push(CLEANUP_QUEUE, job, Some(OUTDATED_PROXIES_UNIQUE_KEY))
115 .await?
116 .is_none()
117 {
118 tracing::debug!("outdated proxies conflict");
119 }
120 Ok(())
121}
122
123pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> {
124 let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
125 if repo
126 .push(CLEANUP_QUEUE, job, Some(OUTDATED_VARIANTS_UNIQUE_KEY))
127 .await?
128 .is_none()
129 {
130 tracing::debug!("outdated variants conflict");
131 }
132 Ok(())
133}
134
135pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
136 let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
137 if repo
138 .push(CLEANUP_QUEUE, job, Some(ALL_VARIANTS_UNIQUE_KEY))
139 .await?
140 .is_none()
141 {
142 tracing::debug!("all variants conflict");
143 }
144 Ok(())
145}
146
147pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> {
148 let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?;
149 if repo
150 .push(CLEANUP_QUEUE, job, Some(PRUNE_MISSING_UNIQUE_KEY))
151 .await?
152 .is_none()
153 {
154 tracing::debug!("prune missing conflict");
155 }
156 Ok(())
157}
158
159pub(crate) async fn queue_ingest(
160 repo: &ArcRepo,
161 identifier: &Arc<str>,
162 upload_id: UploadId,
163 declared_alias: Option<Alias>,
164 upload_query: UploadQuery,
165) -> Result<(), Error> {
166 let job = serde_json::to_value(Process::Ingest {
167 identifier: identifier.to_string(),
168 declared_alias: declared_alias.map(Serde::new),
169 upload_id: Serde::new(upload_id),
170 upload_query,
171 })
172 .map_err(UploadError::PushJob)?;
173 repo.push(PROCESS_QUEUE, job, None).await?;
174 Ok(())
175}
176
177pub(crate) async fn queue_generate(
178 repo: &ArcRepo,
179 target_format: InputProcessableFormat,
180 source: Alias,
181 variant: String,
182 process_args: Vec<String>,
183) -> Result<(), Error> {
184 let job = serde_json::to_value(Process::Generate {
185 target_format,
186 source: Serde::new(source),
187 process_path: variant,
188 process_args,
189 })
190 .map_err(UploadError::PushJob)?;
191 repo.push(PROCESS_QUEUE, job, None).await?;
192 Ok(())
193}
194
195pub(crate) async fn process_cleanup<S: Store + 'static>(state: State<S>) {
196 process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await
197}
198
199pub(crate) async fn process_images<S: Store + 'static>(state: State<S>) {
200 process_jobs(state, PROCESS_QUEUE, process::perform).await
201}
202
203struct MetricsGuard {
204 worker_id: uuid::Uuid,
205 queue: &'static str,
206 start: Instant,
207 armed: bool,
208}
209
210impl MetricsGuard {
211 fn guard(worker_id: uuid::Uuid, queue: &'static str) -> Self {
212 metrics::counter!(crate::init_metrics::JOB_START, "queue" => queue, "worker-id" => worker_id.to_string()).increment(1);
213
214 Self {
215 worker_id,
216 queue,
217 start: Instant::now(),
218 armed: true,
219 }
220 }
221
222 fn disarm(mut self) {
223 self.armed = false;
224 }
225}
226
227impl Drop for MetricsGuard {
228 fn drop(&mut self) {
229 metrics::histogram!(crate::init_metrics::JOB_DURAION, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
230 metrics::counter!(crate::init_metrics::JOB_END, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).increment(1);
231 }
232}
233
234pub(super) enum JobError {
235 Abort(Error),
236 Retry(Error),
237}
238
239impl AsRef<Error> for JobError {
240 fn as_ref(&self) -> &Error {
241 match self {
242 Self::Abort(e) | Self::Retry(e) => e,
243 }
244 }
245}
246
247impl Deref for JobError {
248 type Target = Error;
249
250 fn deref(&self) -> &Self::Target {
251 match self {
252 Self::Abort(e) | Self::Retry(e) => e,
253 }
254 }
255}
256
257impl From<JobError> for Error {
258 fn from(value: JobError) -> Self {
259 match value {
260 JobError::Abort(e) | JobError::Retry(e) => e,
261 }
262 }
263}
264
265type JobResult<T = ()> = Result<T, JobError>;
266
267type JobFuture<'a> = LocalBoxFuture<'a, JobResult>;
268
269trait JobContext {
270 type Item;
271
272 fn abort(self) -> JobResult<Self::Item>
273 where
274 Self: Sized;
275
276 fn retry(self) -> JobResult<Self::Item>
277 where
278 Self: Sized;
279}
280
281impl<T, E> JobContext for Result<T, E>
282where
283 E: Into<Error>,
284{
285 type Item = T;
286
287 fn abort(self) -> JobResult<Self::Item>
288 where
289 Self: Sized,
290 {
291 self.map_err(Into::into).map_err(JobError::Abort)
292 }
293
294 fn retry(self) -> JobResult<Self::Item>
295 where
296 Self: Sized,
297 {
298 self.map_err(Into::into).map_err(JobError::Retry)
299 }
300}
301
302fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
303 match result {
304 Ok(Ok(())) => crate::repo::JobResult::Success,
305 Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
306 Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
307 Err(_) => crate::repo::JobResult::Aborted,
308 }
309}
310
311async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
312where
313 S: Store + 'static,
314 for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
315{
316 let worker_id = uuid::Uuid::new_v4();
317 let state = Rc::new(state);
318
319 loop {
320 tracing::trace!("process_jobs: looping");
321
322 crate::sync::cooperate().await;
323
324 let res = crate::sync::spawn(
326 "job-loop",
327 job_loop(state.clone(), worker_id, queue, callback),
328 )
329 .await;
330
331 match res {
332 Ok(Ok(())) => break,
334
335 Ok(Err(e)) => {
337 tracing::warn!("Error processing jobs: {}", format!("{e}"));
338 tracing::warn!("{}", format!("{e:?}"));
339
340 if e.is_disconnected() {
341 tokio::time::sleep(Duration::from_secs(10)).await;
342 }
343 }
344
345 Err(_) => {
347 tracing::warn!("Panic while processing jobs");
348 }
349 }
350 }
351}
352
353async fn job_loop<S, F>(
354 state: Rc<State<S>>,
355 worker_id: uuid::Uuid,
356 queue: &'static str,
357 callback: F,
358) -> Result<(), Error>
359where
360 S: Store + 'static,
361 for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
362{
363 loop {
364 tracing::trace!("job_loop: looping");
365
366 crate::sync::cooperate().with_poll_timer("cooperate").await;
367
368 async {
369 let (job_id, job) = state
370 .repo
371 .pop(queue, worker_id)
372 .with_poll_timer("pop-job")
373 .await?;
374
375 let guard = MetricsGuard::guard(worker_id, queue);
376
377 let state2 = state.clone();
378 let res = crate::sync::spawn("job-and-heartbeat", async move {
379 let state = state2;
380 heartbeat(
381 &state.repo,
382 queue,
383 worker_id,
384 job_id,
385 (callback)(&state, job),
386 )
387 .await
388 })
389 .await;
390
391 state
392 .repo
393 .complete_job(queue, worker_id, job_id, job_result(&res))
394 .with_poll_timer("job-complete")
395 .await?;
396
397 res.map_err(|_| UploadError::Canceled)??;
398
399 guard.disarm();
400
401 Ok(()) as Result<(), Error>
402 }
403 .instrument(tracing::info_span!("tick", %queue, %worker_id))
404 .await?;
405 }
406}
407
408#[tracing::instrument("running-job", skip(repo, queue, worker_id, fut))]
409async fn heartbeat<Fut>(
410 repo: &ArcRepo,
411 queue: &'static str,
412 worker_id: uuid::Uuid,
413 job_id: JobId,
414 fut: Fut,
415) -> Fut::Output
416where
417 Fut: std::future::Future,
418{
419 let mut fut = std::pin::pin!(fut
420 .with_poll_timer("job-future")
421 .instrument(tracing::info_span!("job-future")));
422
423 let mut interval = tokio::time::interval(Duration::from_secs(5));
424
425 let mut hb = None;
426
427 loop {
428 tracing::trace!("heartbeat: looping");
429
430 crate::sync::cooperate().await;
431
432 tokio::select! {
433 biased;
434 output = &mut fut => {
435 return output;
436 }
437 _ = interval.tick() => {
438 if hb.is_none() {
439 hb = Some(repo.heartbeat(queue, worker_id, job_id));
440 }
441 }
442 opt = poll_opt(hb.as_mut()), if hb.is_some() => {
443 hb.take();
444
445 if let Some(Err(e)) = opt {
446 tracing::warn!("Failed heartbeat\n{}", format!("{e:?}"));
447 }
448 }
449 }
450 }
451}
452
453async fn poll_opt<Fut>(opt: Option<&mut Fut>) -> Option<Fut::Output>
454where
455 Fut: std::future::Future + Unpin,
456{
457 match opt {
458 None => None,
459 Some(fut) => Some(fut.await),
460 }
461}