1#[cfg(test)]
8mod tests;
9
10pub(crate) mod id_indexer;
11
12use crate::{Error, PgEventId};
13use async_trait::async_trait;
14use disintegrate::{Event, EventListener, EventStore, StreamQuery};
15use disintegrate_serde::Serde;
16use futures::future::join_all;
17use futures::{try_join, Future, StreamExt};
18use sqlx::{PgPool, Postgres, Row, Transaction};
19use std::error::Error as StdError;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27use crate::event_store::PgEventStore;
28
29pub struct PgEventListener<E, S>
31where
32 E: Event + Clone,
33 S: Serde<E> + Send + Sync,
34{
35 executors: Vec<Box<dyn EventListenerExecutor<E> + Send + Sync>>,
36 event_store: PgEventStore<E, S>,
37 intialize: bool,
38 shutdown_token: CancellationToken,
39}
40
41impl<E, S> PgEventListener<E, S>
42where
43 E: Event + Clone + Send + Sync + 'static,
44 S: Serde<E> + Clone + Send + Sync + 'static,
45{
46 pub fn builder(event_store: PgEventStore<E, S>) -> Self {
56 Self {
57 event_store,
58 executors: vec![],
59 shutdown_token: CancellationToken::new(),
60 intialize: true,
61 }
62 }
63
64 pub fn uninitialized(mut self) -> Self {
76 self.intialize = false;
77 self
78 }
79
80 pub fn register_listener<QE>(
91 mut self,
92 event_listener: impl EventListener<PgEventId, QE> + 'static,
93 config: PgEventListenerConfig,
94 ) -> Self
95 where
96 QE: TryFrom<E> + Into<E> + Event + Send + Sync + Clone + 'static,
97 <QE as TryFrom<E>>::Error: StdError + Send + Sync,
98 {
99 self.executors.push(Box::new(PgEventListerExecutor::new(
100 self.event_store.clone(),
101 event_listener,
102 self.shutdown_token.clone(),
103 config,
104 )));
105 self
106 }
107
108 pub async fn start(self) -> Result<(), Error> {
114 if self.intialize {
115 setup(&self.event_store.pool).await?;
116 }
117 let mut handles = vec![];
118 let mut wakers = vec![];
119 for executor in self.executors {
120 executor.init().await?;
121 let (waker, task) = executor.run();
122 if let Some(waker) = waker {
123 wakers.push(waker);
124 }
125 handles.push(task);
126 }
127 if !wakers.is_empty() {
128 let pool = self.event_store.pool.clone();
129 let shutdown = self.shutdown_token.clone();
130 let watch_new_events = tokio::spawn(async move {
131 loop {
132 let mut listener = sqlx::postgres::PgListener::connect_with(&pool).await?;
133 listener.listen("new_events").await?;
134 loop {
135 tokio::select! {
136 msg = listener.try_recv() => {
137 match msg {
138 Ok(Some(notification)) => {
139 for waker in &wakers {
140 waker.wake(notification.payload());
141 }
142 },
143 Ok(None) => {},
144 Err(err @ sqlx::Error::PoolClosed) => return Err(Error::Database(err)),
145 Err(_) => break,
146 }
147 }
148 _ = shutdown.cancelled() => return Ok::<(), Error>(()),
149 }
150 }
151 }
152 });
153 handles.push(watch_new_events);
154 }
155 join_all(handles).await;
156 Ok(())
157 }
158
159 pub async fn start_with_shutdown<F: Future<Output = ()> + Send + 'static>(
169 self,
170 shutdown: F,
171 ) -> Result<(), Error> {
172 let shutdown_token = self.shutdown_token.clone();
173 let shutdown_handle = async move {
174 shutdown.await;
175 shutdown_token.cancel();
176 Ok::<(), Error>(())
177 };
178 try_join!(self.start(), shutdown_handle).map(|_| ())
179 }
180}
181
182#[derive(Debug)]
183pub struct PgEventListenerError {
184 last_processed_event_id: PgEventId,
185}
186
187#[derive(Clone)]
196pub struct PgEventListenerConfig {
197 poll: Duration,
198 fetch_size: usize,
199 notifier_enabled: bool,
200}
201
202impl PgEventListenerConfig {
203 pub fn poller(poll: Duration) -> Self {
213 Self {
214 poll,
215 fetch_size: usize::MAX,
216 notifier_enabled: false,
217 }
218 }
219
220 pub fn fetch_size(mut self, fetch_size: usize) -> Self {
231 self.fetch_size = fetch_size;
232 self
233 }
234
235 pub fn with_notifier(mut self) -> Self {
242 self.notifier_enabled = true;
243 self
244 }
245}
246
247#[async_trait]
248trait EventListenerExecutor<E: Event + Clone> {
249 async fn init(&self) -> Result<(), Error>;
250 fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>);
251}
252
253struct PgEventListerExecutor<L, QE, E, S>
254where
255 QE: TryFrom<E> + Event + Send + Sync + Clone,
256 <QE as TryFrom<E>>::Error: Send + Sync,
257 E: Event + Clone + Sync + Send,
258 S: Serde<E> + Clone + Send + Sync,
259 L: EventListener<PgEventId, QE>,
260{
261 event_store: PgEventStore<E, S>,
262 event_handler: Arc<L>,
263 config: PgEventListenerConfig,
264 wake_channel: (watch::Sender<bool>, watch::Receiver<bool>),
265 shutdown_token: CancellationToken,
266 _event_store_events: PhantomData<E>,
267 _event_listener_events: PhantomData<QE>,
268}
269
270impl<L, QE, E, S> PgEventListerExecutor<L, QE, E, S>
271where
272 E: Event + Clone + Sync + Send + 'static,
273 S: Serde<E> + Clone + Send + Sync + 'static,
274 QE: TryFrom<E> + Event + 'static + Send + Sync + Clone,
275 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
276 L: EventListener<PgEventId, QE> + 'static,
277{
278 pub fn new(
279 event_store: PgEventStore<E, S>,
280 event_handler: L,
281 shutdown_token: CancellationToken,
282 config: PgEventListenerConfig,
283 ) -> Self {
284 Self {
285 event_store,
286 event_handler: Arc::new(event_handler),
287 config,
288 wake_channel: watch::channel(true),
289 shutdown_token,
290 _event_store_events: PhantomData,
291 _event_listener_events: PhantomData,
292 }
293 }
294
295 async fn lock_event_listener(
296 &self,
297 tx: &mut Transaction<'_, Postgres>,
298 ) -> Result<Option<PgEventId>, sqlx::Error> {
299 Ok(sqlx::query(
300 r#"
301 SELECT last_processed_event_id
302 FROM event_listener
303 WHERE id = $1
304 FOR UPDATE SKIP LOCKED
305 "#,
306 )
307 .bind(self.event_handler.id())
308 .fetch_optional(&mut **tx)
309 .await?
310 .map(|r| r.get(0)))
311 }
312
313 async fn release_event_listener(
314 &self,
315 result: Result<PgEventId, PgEventListenerError>,
316 mut tx: Transaction<'_, Postgres>,
317 ) -> Result<(), sqlx::Error> {
318 let last_processed_event_id = match result {
319 Ok(last_processed_event_id) => last_processed_event_id,
320 Err(PgEventListenerError {
321 last_processed_event_id,
322 }) => last_processed_event_id,
323 };
324 sqlx::query(
325 "UPDATE event_listener SET last_processed_event_id = $1, updated_at = now() WHERE id = $2",
326 )
327 .bind(last_processed_event_id)
328 .bind(self.event_handler.id())
329 .execute(&mut *tx)
330 .await?;
331 tx.commit().await
332 }
333
334 pub async fn handle_events_from(
335 &self,
336 mut last_processed_event_id: PgEventId,
337 ) -> Result<PgEventId, PgEventListenerError> {
338 let query = self
339 .event_handler
340 .query()
341 .clone()
342 .change_origin(last_processed_event_id);
343 let mut events_stream = self.event_store.stream(&query).take(self.config.fetch_size);
344
345 while let Some(event) = events_stream.next().await {
346 let event = event.map_err(|_err| PgEventListenerError {
347 last_processed_event_id,
348 })?;
349 let event_id = event.id();
350 match self.event_handler.handle(event).await {
351 Ok(_) => last_processed_event_id = event_id,
352 Err(_) => {
353 return Err(PgEventListenerError {
354 last_processed_event_id,
355 })
356 }
357 }
358 if self.shutdown_token.is_cancelled() {
359 break;
360 }
361 }
362
363 Ok(last_processed_event_id)
364 }
365
366 pub async fn try_execute(&self) -> Result<(), sqlx::Error> {
367 let mut tx = self.event_store.pool.begin().await?;
368 let Some(last_processed_id) = self.lock_event_listener(&mut tx).await? else {
369 return Ok(());
370 };
371 let result = self.handle_events_from(last_processed_id).await;
372 self.release_event_listener(result, tx).await
373 }
374
375 async fn execute(&self) -> Result<(), Error> {
376 let result = self.try_execute().await;
377 match result {
378 Err(sqlx::Error::Io(_)) | Err(sqlx::Error::PoolTimedOut) => Ok(()),
379 Err(err) => Err(Error::Database(err)),
380 _ => Ok(()),
381 }
382 }
383
384 pub fn spawn_task(self) -> JoinHandle<Result<(), Error>> {
385 let shutdown = self.shutdown_token.clone();
386 let mut poll = tokio::time::interval(self.config.poll);
387 poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
388 let mut wake_tx = self.wake_channel.1.clone();
389 tokio::spawn(async move {
390 loop {
391 tokio::select! {
392 Ok(()) = wake_tx.changed() => self.execute().await?,
393 _ = poll.tick() => self.execute().await?,
394 _ = shutdown.cancelled() => return Ok::<(), Error>(()),
395 };
396 }
397 })
398 }
399}
400
401#[async_trait]
402impl<L, QE, E, S> EventListenerExecutor<E> for PgEventListerExecutor<L, QE, E, S>
403where
404 E: Event + Clone + Sync + Send + 'static,
405 S: Serde<E> + Clone + Send + Sync + 'static,
406 QE: TryFrom<E> + Into<E> + Event + 'static + Send + Sync + Clone,
407 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
408 L: EventListener<PgEventId, QE> + 'static,
409{
410 async fn init(&self) -> Result<(), Error> {
411 let mut tx = self.event_store.pool.begin().await?;
412 sqlx::query("INSERT INTO event_listener (id, last_processed_event_id) VALUES ($1, 0) ON CONFLICT (id) DO NOTHING")
413 .bind(self.event_handler.id())
414 .execute(&mut *tx)
415 .await?;
416 tx.commit().await?;
417 Ok(())
418 }
419
420 fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>) {
421 let waker = if self.config.notifier_enabled {
422 Some(ExecutorWaker {
423 wake_tx: self.wake_channel.0.clone(),
424 query: self.event_handler.query().cast().clone(),
425 })
426 } else {
427 None
428 };
429 (waker, self.clone().spawn_task())
430 }
431}
432
433impl<L, QE, E, S> Clone for PgEventListerExecutor<L, QE, E, S>
434where
435 QE: TryFrom<E> + Event + Send + Sync + Clone,
436 <QE as TryFrom<E>>::Error: Send + Sync,
437 E: Event + Clone + Sync + Send,
438 S: Serde<E> + Clone + Send + Sync,
439 L: EventListener<PgEventId, QE>,
440{
441 fn clone(&self) -> Self {
442 Self {
443 event_store: self.event_store.clone(),
444 event_handler: Arc::clone(&self.event_handler),
445 config: self.config.clone(),
446 wake_channel: self.wake_channel.clone(),
447 shutdown_token: self.shutdown_token.clone(),
448 _event_store_events: PhantomData,
449 _event_listener_events: PhantomData,
450 }
451 }
452}
453
454struct ExecutorWaker<E: Event + Clone> {
455 wake_tx: watch::Sender<bool>,
456 query: StreamQuery<PgEventId, E>,
457}
458
459impl<E: Event + Clone> ExecutorWaker<E> {
460 fn wake(&self, event: &str) {
461 if self.query.matches_event(event) {
462 self.wake_tx.send_replace(true);
463 }
464 }
465}
466
467async fn setup(pool: &PgPool) -> Result<(), Error> {
468 sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
469 .execute(pool)
470 .await?;
471 sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
472 .execute(pool)
473 .await?;
474 sqlx::query(include_str!(
475 "listener/sql/trigger_notify_event_listener.sql"
476 ))
477 .execute(pool)
478 .await?;
479 Ok(())
480}