pict_rs/
queue.rs

1use crate::{
2    error::{Error, UploadError},
3    formats::InputProcessableFormat,
4    future::{LocalBoxFuture, WithPollTimer},
5    repo::{Alias, ArcRepo, DeleteToken, Hash, JobId, UploadId},
6    serde_str::Serde,
7    state::State,
8    store::Store,
9    UploadQuery,
10};
11
12use std::{
13    ops::Deref,
14    rc::Rc,
15    sync::Arc,
16    time::{Duration, Instant},
17};
18use tokio::task::JoinError;
19use tracing::Instrument;
20
21pub(crate) mod cleanup;
22mod process;
23
24const CLEANUP_QUEUE: &str = "cleanup";
25const PROCESS_QUEUE: &str = "process";
26const OUTDATED_PROXIES_UNIQUE_KEY: &str = "outdated-proxies";
27const OUTDATED_VARIANTS_UNIQUE_KEY: &str = "outdated-variants";
28const ALL_VARIANTS_UNIQUE_KEY: &str = "all-variants";
29const PRUNE_MISSING_UNIQUE_KEY: &str = "prune-missing";
30
31#[derive(Debug, serde::Deserialize, serde::Serialize)]
32enum Cleanup {
33    Hash {
34        hash: Hash,
35    },
36    Identifier {
37        identifier: String,
38    },
39    Alias {
40        alias: Serde<Alias>,
41        token: Serde<DeleteToken>,
42    },
43    Variant {
44        hash: Hash,
45        #[serde(skip_serializing_if = "Option::is_none")]
46        variant: Option<String>,
47    },
48    AllVariants,
49    OutdatedVariants,
50    OutdatedProxies,
51    Prune,
52}
53
54#[derive(Debug, serde::Deserialize, serde::Serialize)]
55enum Process {
56    Ingest {
57        identifier: String,
58        upload_id: Serde<UploadId>,
59        declared_alias: Option<Serde<Alias>>,
60        #[serde(default)]
61        upload_query: UploadQuery,
62    },
63    Generate {
64        target_format: InputProcessableFormat,
65        source: Serde<Alias>,
66        process_path: String,
67        process_args: Vec<String>,
68    },
69}
70
71pub(crate) async fn cleanup_alias(
72    repo: &ArcRepo,
73    alias: Alias,
74    token: DeleteToken,
75) -> Result<(), Error> {
76    let job = serde_json::to_value(Cleanup::Alias {
77        alias: Serde::new(alias),
78        token: Serde::new(token),
79    })
80    .map_err(UploadError::PushJob)?;
81    repo.push(CLEANUP_QUEUE, job, None).await?;
82    Ok(())
83}
84
85pub(crate) async fn cleanup_hash(repo: &ArcRepo, hash: Hash) -> Result<(), Error> {
86    let job = serde_json::to_value(Cleanup::Hash { hash }).map_err(UploadError::PushJob)?;
87    repo.push(CLEANUP_QUEUE, job, None).await?;
88    Ok(())
89}
90
91pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) -> Result<(), Error> {
92    let job = serde_json::to_value(Cleanup::Identifier {
93        identifier: identifier.to_string(),
94    })
95    .map_err(UploadError::PushJob)?;
96    repo.push(CLEANUP_QUEUE, job, None).await?;
97    Ok(())
98}
99
100async fn cleanup_variants(
101    repo: &ArcRepo,
102    hash: Hash,
103    variant: Option<String>,
104) -> Result<(), Error> {
105    let job =
106        serde_json::to_value(Cleanup::Variant { hash, variant }).map_err(UploadError::PushJob)?;
107    repo.push(CLEANUP_QUEUE, job, None).await?;
108    Ok(())
109}
110
111pub(crate) async fn cleanup_outdated_proxies(repo: &ArcRepo) -> Result<(), Error> {
112    let job = serde_json::to_value(Cleanup::OutdatedProxies).map_err(UploadError::PushJob)?;
113    if repo
114        .push(CLEANUP_QUEUE, job, Some(OUTDATED_PROXIES_UNIQUE_KEY))
115        .await?
116        .is_none()
117    {
118        tracing::debug!("outdated proxies conflict");
119    }
120    Ok(())
121}
122
123pub(crate) async fn cleanup_outdated_variants(repo: &ArcRepo) -> Result<(), Error> {
124    let job = serde_json::to_value(Cleanup::OutdatedVariants).map_err(UploadError::PushJob)?;
125    if repo
126        .push(CLEANUP_QUEUE, job, Some(OUTDATED_VARIANTS_UNIQUE_KEY))
127        .await?
128        .is_none()
129    {
130        tracing::debug!("outdated variants conflict");
131    }
132    Ok(())
133}
134
135pub(crate) async fn cleanup_all_variants(repo: &ArcRepo) -> Result<(), Error> {
136    let job = serde_json::to_value(Cleanup::AllVariants).map_err(UploadError::PushJob)?;
137    if repo
138        .push(CLEANUP_QUEUE, job, Some(ALL_VARIANTS_UNIQUE_KEY))
139        .await?
140        .is_none()
141    {
142        tracing::debug!("all variants conflict");
143    }
144    Ok(())
145}
146
147pub(crate) async fn prune_missing(repo: &ArcRepo) -> Result<(), Error> {
148    let job = serde_json::to_value(Cleanup::Prune).map_err(UploadError::PushJob)?;
149    if repo
150        .push(CLEANUP_QUEUE, job, Some(PRUNE_MISSING_UNIQUE_KEY))
151        .await?
152        .is_none()
153    {
154        tracing::debug!("prune missing conflict");
155    }
156    Ok(())
157}
158
159pub(crate) async fn queue_ingest(
160    repo: &ArcRepo,
161    identifier: &Arc<str>,
162    upload_id: UploadId,
163    declared_alias: Option<Alias>,
164    upload_query: UploadQuery,
165) -> Result<(), Error> {
166    let job = serde_json::to_value(Process::Ingest {
167        identifier: identifier.to_string(),
168        declared_alias: declared_alias.map(Serde::new),
169        upload_id: Serde::new(upload_id),
170        upload_query,
171    })
172    .map_err(UploadError::PushJob)?;
173    repo.push(PROCESS_QUEUE, job, None).await?;
174    Ok(())
175}
176
177pub(crate) async fn queue_generate(
178    repo: &ArcRepo,
179    target_format: InputProcessableFormat,
180    source: Alias,
181    variant: String,
182    process_args: Vec<String>,
183) -> Result<(), Error> {
184    let job = serde_json::to_value(Process::Generate {
185        target_format,
186        source: Serde::new(source),
187        process_path: variant,
188        process_args,
189    })
190    .map_err(UploadError::PushJob)?;
191    repo.push(PROCESS_QUEUE, job, None).await?;
192    Ok(())
193}
194
195pub(crate) async fn process_cleanup<S: Store + 'static>(state: State<S>) {
196    process_jobs(state, CLEANUP_QUEUE, cleanup::perform).await
197}
198
199pub(crate) async fn process_images<S: Store + 'static>(state: State<S>) {
200    process_jobs(state, PROCESS_QUEUE, process::perform).await
201}
202
203struct MetricsGuard {
204    worker_id: uuid::Uuid,
205    queue: &'static str,
206    start: Instant,
207    armed: bool,
208}
209
210impl MetricsGuard {
211    fn guard(worker_id: uuid::Uuid, queue: &'static str) -> Self {
212        metrics::counter!(crate::init_metrics::JOB_START, "queue" => queue, "worker-id" => worker_id.to_string()).increment(1);
213
214        Self {
215            worker_id,
216            queue,
217            start: Instant::now(),
218            armed: true,
219        }
220    }
221
222    fn disarm(mut self) {
223        self.armed = false;
224    }
225}
226
227impl Drop for MetricsGuard {
228    fn drop(&mut self) {
229        metrics::histogram!(crate::init_metrics::JOB_DURAION, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).record(self.start.elapsed().as_secs_f64());
230        metrics::counter!(crate::init_metrics::JOB_END, "queue" => self.queue, "worker-id" => self.worker_id.to_string(), "completed" => (!self.armed).to_string()).increment(1);
231    }
232}
233
234pub(super) enum JobError {
235    Abort(Error),
236    Retry(Error),
237}
238
239impl AsRef<Error> for JobError {
240    fn as_ref(&self) -> &Error {
241        match self {
242            Self::Abort(e) | Self::Retry(e) => e,
243        }
244    }
245}
246
247impl Deref for JobError {
248    type Target = Error;
249
250    fn deref(&self) -> &Self::Target {
251        match self {
252            Self::Abort(e) | Self::Retry(e) => e,
253        }
254    }
255}
256
257impl From<JobError> for Error {
258    fn from(value: JobError) -> Self {
259        match value {
260            JobError::Abort(e) | JobError::Retry(e) => e,
261        }
262    }
263}
264
265type JobResult<T = ()> = Result<T, JobError>;
266
267type JobFuture<'a> = LocalBoxFuture<'a, JobResult>;
268
269trait JobContext {
270    type Item;
271
272    fn abort(self) -> JobResult<Self::Item>
273    where
274        Self: Sized;
275
276    fn retry(self) -> JobResult<Self::Item>
277    where
278        Self: Sized;
279}
280
281impl<T, E> JobContext for Result<T, E>
282where
283    E: Into<Error>,
284{
285    type Item = T;
286
287    fn abort(self) -> JobResult<Self::Item>
288    where
289        Self: Sized,
290    {
291        self.map_err(Into::into).map_err(JobError::Abort)
292    }
293
294    fn retry(self) -> JobResult<Self::Item>
295    where
296        Self: Sized,
297    {
298        self.map_err(Into::into).map_err(JobError::Retry)
299    }
300}
301
302fn job_result(result: &Result<JobResult, JoinError>) -> crate::repo::JobResult {
303    match result {
304        Ok(Ok(())) => crate::repo::JobResult::Success,
305        Ok(Err(JobError::Retry(_))) => crate::repo::JobResult::Failure,
306        Ok(Err(JobError::Abort(_))) => crate::repo::JobResult::Aborted,
307        Err(_) => crate::repo::JobResult::Aborted,
308    }
309}
310
311async fn process_jobs<S, F>(state: State<S>, queue: &'static str, callback: F)
312where
313    S: Store + 'static,
314    for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
315{
316    let worker_id = uuid::Uuid::new_v4();
317    let state = Rc::new(state);
318
319    loop {
320        tracing::trace!("process_jobs: looping");
321
322        crate::sync::cooperate().await;
323
324        // add a panic boundary by spawning a task
325        let res = crate::sync::spawn(
326            "job-loop",
327            job_loop(state.clone(), worker_id, queue, callback),
328        )
329        .await;
330
331        match res {
332            // clean exit
333            Ok(Ok(())) => break,
334
335            // job error
336            Ok(Err(e)) => {
337                tracing::warn!("Error processing jobs: {}", format!("{e}"));
338                tracing::warn!("{}", format!("{e:?}"));
339
340                if e.is_disconnected() {
341                    tokio::time::sleep(Duration::from_secs(10)).await;
342                }
343            }
344
345            // job panic
346            Err(_) => {
347                tracing::warn!("Panic while processing jobs");
348            }
349        }
350    }
351}
352
353async fn job_loop<S, F>(
354    state: Rc<State<S>>,
355    worker_id: uuid::Uuid,
356    queue: &'static str,
357    callback: F,
358) -> Result<(), Error>
359where
360    S: Store + 'static,
361    for<'a> F: Fn(&'a State<S>, serde_json::Value) -> JobFuture<'a> + Copy + 'static,
362{
363    loop {
364        tracing::trace!("job_loop: looping");
365
366        crate::sync::cooperate().with_poll_timer("cooperate").await;
367
368        async {
369            let (job_id, job) = state
370                .repo
371                .pop(queue, worker_id)
372                .with_poll_timer("pop-job")
373                .await?;
374
375            let guard = MetricsGuard::guard(worker_id, queue);
376
377            let state2 = state.clone();
378            let res = crate::sync::spawn("job-and-heartbeat", async move {
379                let state = state2;
380                heartbeat(
381                    &state.repo,
382                    queue,
383                    worker_id,
384                    job_id,
385                    (callback)(&state, job),
386                )
387                .await
388            })
389            .await;
390
391            state
392                .repo
393                .complete_job(queue, worker_id, job_id, job_result(&res))
394                .with_poll_timer("job-complete")
395                .await?;
396
397            res.map_err(|_| UploadError::Canceled)??;
398
399            guard.disarm();
400
401            Ok(()) as Result<(), Error>
402        }
403        .instrument(tracing::info_span!("tick", %queue, %worker_id))
404        .await?;
405    }
406}
407
408#[tracing::instrument("running-job", skip(repo, queue, worker_id, fut))]
409async fn heartbeat<Fut>(
410    repo: &ArcRepo,
411    queue: &'static str,
412    worker_id: uuid::Uuid,
413    job_id: JobId,
414    fut: Fut,
415) -> Fut::Output
416where
417    Fut: std::future::Future,
418{
419    let mut fut = std::pin::pin!(fut
420        .with_poll_timer("job-future")
421        .instrument(tracing::info_span!("job-future")));
422
423    let mut interval = tokio::time::interval(Duration::from_secs(5));
424
425    let mut hb = None;
426
427    loop {
428        tracing::trace!("heartbeat: looping");
429
430        crate::sync::cooperate().await;
431
432        tokio::select! {
433            biased;
434            output = &mut fut => {
435                return output;
436            }
437            _ = interval.tick() => {
438                if hb.is_none() {
439                    hb = Some(repo.heartbeat(queue, worker_id, job_id));
440                }
441            }
442            opt = poll_opt(hb.as_mut()), if hb.is_some() => {
443                hb.take();
444
445                if let Some(Err(e)) = opt {
446                    tracing::warn!("Failed heartbeat\n{}", format!("{e:?}"));
447                }
448            }
449        }
450    }
451}
452
453async fn poll_opt<Fut>(opt: Option<&mut Fut>) -> Option<Fut::Output>
454where
455    Fut: std::future::Future + Unpin,
456{
457    match opt {
458        None => None,
459        Some(fut) => Some(fut.await),
460    }
461}