mod select;
mod state;
use std::time::Duration;
use futures::{stream, Stream};
use graphile_worker_database::Database;
use graphile_worker_runtime as runtime;
use graphile_worker_shutdown_signal::ShutdownSignal;
use tracing::warn;
use crate::errors::Result;
use state::{JobSignalStreamData, NextSignal};
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum StreamSource {
Polling,
PgListener,
RunOnce,
Internal,
}
pub type JobSignalSender = runtime::Sender<()>;
pub type JobSignalReceiver = runtime::Receiver<()>;
pub async fn job_signal_stream(
database: Database,
poll_interval: Duration,
shutdown_signal: ShutdownSignal,
concurrency: usize,
) -> Result<impl Stream<Item = StreamSource>> {
job_signal_stream_internal(database, poll_interval, shutdown_signal, concurrency, None).await
}
pub async fn job_signal_stream_with_receiver(
database: Database,
poll_interval: Duration,
shutdown_signal: ShutdownSignal,
concurrency: usize,
internal_rx: JobSignalReceiver,
) -> Result<impl Stream<Item = StreamSource>> {
job_signal_stream_internal(
database,
poll_interval,
shutdown_signal,
concurrency,
Some(internal_rx),
)
.await
}
async fn job_signal_stream_internal(
database: Database,
poll_interval: Duration,
shutdown_signal: ShutdownSignal,
concurrency: usize,
internal_rx: Option<runtime::Receiver<()>>,
) -> Result<impl Stream<Item = StreamSource>> {
let interval = runtime::interval(poll_interval);
let pg_listener = database.listen("jobs:insert").await?;
let stream_data = JobSignalStreamData::new(
interval,
pg_listener,
shutdown_signal,
concurrency,
internal_rx,
);
let stream = stream::unfold(stream_data, |mut data| async {
if let Some(source) = data.yield_pending_source() {
return Some((source, data));
}
match select::next_signal(&mut data).await {
NextSignal::Source(source) => {
data.queue_concurrency_yields(source);
Some((source, data))
}
NextSignal::InternalClosed => {
warn!("Job signal stream internal channel closed");
None
}
NextSignal::PgListenerClosed => {
data.pg_listener = None;
let source = StreamSource::Polling;
data.queue_concurrency_yields(source);
Some((source, data))
}
NextSignal::Shutdown => None,
}
});
Ok(stream)
}