use chrono::Utc;
use futures::{FutureExt, Stream};
use graphile_worker_database::{Database, Schema};
use graphile_worker_shutdown_signal::ShutdownSignal;
use tracing::error;
use crate::Job;
use graphile_worker_queries::{get_job::get_job, task_identifiers::SharedTaskDetails};
pub fn job_stream(
database: Database,
shutdown_signal: ShutdownSignal,
task_details: SharedTaskDetails,
schema: Schema,
worker_id: String,
forbidden_flags: Vec<String>,
use_local_time: bool,
) -> impl Stream<Item = Job> {
futures::stream::unfold((), move |()| {
let database = database.clone();
let task_details = task_details.clone();
let schema = schema.clone();
let worker_id = worker_id.clone();
let forbidden_flags = forbidden_flags.clone();
let job_fut = async move {
let now = use_local_time.then(Utc::now);
let task_details_guard = task_details.read().await;
let job = get_job(
&database,
&task_details_guard,
&schema,
&worker_id,
&forbidden_flags,
now,
)
.await
.map_err(|e| {
error!("Could not get job : {:?}", e);
e
});
match job {
Ok(Some(job)) => Some((job, ())),
Ok(None) => None,
Err(_) => {
error!("Error occurred while trying to get job : {:?}", job);
None
}
}
};
let shutdown_fut = shutdown_signal.clone();
async move {
let job_fut = job_fut.fuse();
let shutdown_fut = shutdown_fut.fuse();
futures::pin_mut!(job_fut, shutdown_fut);
futures::select_biased! {
res = job_fut => res,
_ = shutdown_fut => None
}
}
})
}