use std::{
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll},
time::Duration,
};
use diesel::{RunQueryDsl, sql_query};
use futures::{Stream, channel::mpsc};
use crate::{Error, InsertEvent, PgPool, PgTaskId};
pub(crate) const NOTIFY_LISTENER_POLL_INTERVAL: Duration = Duration::from_millis(50);
pub(crate) const NOTIFY_CHANNEL_CAPACITY_MAX: usize = 8192;
pub(crate) fn notify_task_ids(
pool: PgPool,
queue: String,
capacity: usize,
) -> impl Stream<Item = Result<PgTaskId, Error>> + Send {
let (mut sender, receiver) = mpsc::channel(capacity.clamp(1, NOTIFY_CHANNEL_CAPACITY_MAX));
let cancel = Arc::new(AtomicBool::new(false));
let thread_cancel = cancel.clone();
let mut spawn_error_sender = sender.clone();
let thread_pool = pool.clone();
if let Err(error) = std::thread::Builder::new()
.name("apalis-postgres-notify".to_owned())
.spawn(move || {
let mut conn = match thread_pool.get() {
Ok(conn) => conn,
Err(error) => {
let _ = sender.try_send(Err(Error::from(error)));
return;
}
};
if let Err(error) = sql_query("LISTEN \"apalis::job::insert\"").execute(&mut conn) {
let _ = sender.try_send(Err(Error::database(
"starting PostgreSQL LISTEN notification listener",
)(error)));
return;
}
let unlisten = |conn: &mut diesel::PgConnection| {
let _ = sql_query("UNLISTEN \"apalis::job::insert\"").execute(conn);
};
'listen: while !thread_cancel.load(Ordering::Acquire) {
for notification in conn.notifications_iter() {
if thread_cancel.load(Ordering::Acquire) {
break 'listen;
}
let notification = match notification {
Ok(notification) => notification,
Err(error) => {
let _ = sender.try_send(Err(Error::database(
"receiving PostgreSQL notification",
)(error)));
break 'listen;
}
};
let Ok(event) = serde_json::from_str::<InsertEvent>(¬ification.payload)
else {
continue;
};
let (event_queue, ids) = event.into_ids();
if event_queue != queue {
continue;
}
for id in ids {
match sender.try_send(Ok(id)) {
Ok(()) => {}
Err(error) if error.is_disconnected() => break 'listen,
Err(_) => break,
}
}
}
std::thread::sleep(NOTIFY_LISTENER_POLL_INTERVAL);
}
unlisten(&mut conn);
})
{
let _ = spawn_error_sender.try_send(Err(Error::NotifyListener(error.to_string())));
}
NotifyTaskIds {
receiver,
cancel,
pool,
}
}
pub(crate) struct NotifyTaskIds {
receiver: mpsc::Receiver<Result<PgTaskId, Error>>,
cancel: Arc<AtomicBool>,
pool: PgPool,
}
impl Stream for NotifyTaskIds {
type Item = Result<PgTaskId, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}
impl Drop for NotifyTaskIds {
fn drop(&mut self) {
self.cancel.store(true, Ordering::Release);
let pool = self.pool.clone();
let _ = std::thread::Builder::new()
.name("apalis-postgres-notify-drop".to_owned())
.spawn(move || {
if let Ok(mut conn) = pool.get() {
let _ =
sql_query("SELECT pg_notify('apalis::job::insert', '')").execute(&mut conn);
}
});
}
}