1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use std::sync::Arc;
use chrono::Utc;
use futures::StreamExt;
use tracing::{error, info};
use super::job_execution::run_and_release_job;
use super::{Worker, WorkerRuntimeError};
use crate::streams::job_signal::JobSignalSource;
use crate::streams::job_stream;
use graphile_worker_queries::get_job::get_job;
impl Worker {
/// Runs the worker once and processes all available jobs, then returns.
pub async fn run_once(&self) -> Result<(), WorkerRuntimeError> {
let job_stream = job_stream(
self.database.clone(),
self.shutdown_signal.clone(),
self.task_details.clone(),
self.schema.clone(),
self.worker_id.clone(),
self.forbidden_flags.clone(),
self.use_local_time,
);
let runner = self.runner();
job_stream
.for_each_concurrent(self.concurrency, {
let runner = runner.clone();
move |mut job| {
let runner = runner.clone();
async move {
loop {
let job_id = *job.id();
let has_queue = job.job_queue_id().is_some();
let result = run_and_release_job(
Arc::new(job),
&runner,
&JobSignalSource::RunOnce,
)
.await;
match result {
Ok(_) => {
info!(job_id, "Job processed");
}
Err(e) => {
error!("Error while processing job : {:?}", e);
}
};
if !has_queue {
break;
}
info!(job_id, "Job has queue, fetching another job");
let now = runner.use_local_time.then(Utc::now);
let task_details_guard = runner.task_details.read().await;
let new_job = get_job(
&runner.database,
&task_details_guard,
&runner.schema,
&runner.worker_id,
&runner.forbidden_flags,
now,
)
.await
.unwrap_or(None);
drop(task_details_guard);
let Some(new_job) = new_job else {
break;
};
job = new_job;
}
}
}
})
.await;
Ok(())
}
}