disintegrate_postgres/
listener.rs

1//! PostgreSQL Event Listener
2//!
3//! This module provides an implementation of a PostgreSQL event listener.
4//! It allows listening events when they are persisted in the event store.
5//! It assures that the events are delivered at least once, so the implementation
6//! of the `EventListener` trait should handle duplicated events delivery in case of failures.
7#[cfg(test)]
8mod tests;
9
10pub(crate) mod id_indexer;
11
12use crate::{Error, Migrator, PgEventId};
13use async_trait::async_trait;
14use disintegrate::{Event, EventListener, StreamItem, StreamQuery};
15use disintegrate_serde::Serde;
16use futures::future::join_all;
17use futures::{try_join, Future, StreamExt};
18use sqlx::{Postgres, Row, Transaction};
19use std::error::Error as StdError;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27use crate::event_store::PgEventStore;
28
29/// PostgreSQL event listener implementation.
30pub struct PgEventListener<E, S>
31where
32    E: Event + Clone,
33    S: Serde<E> + Send + Sync,
34{
35    executors: Vec<Box<dyn EventListenerExecutor<E> + Send + Sync>>,
36    event_store: PgEventStore<E, S>,
37    intialize: bool,
38    shutdown_token: CancellationToken,
39}
40
41impl<E, S> PgEventListener<E, S>
42where
43    E: Event + Clone + Send + Sync + 'static,
44    S: Serde<E> + Clone + Send + Sync + 'static,
45{
46    /// Creates a new `PgEventListener` that listens to the events coming from the provided `PgEventStore`
47    ///
48    /// # Parameters
49    ///
50    /// * `event_store`: An instance of `PgEventStore` representing the event store for the listener.
51    ///
52    /// # Returns
53    ///
54    /// A new `PgEventListener` instance.
55    pub fn builder(event_store: PgEventStore<E, S>) -> Self {
56        Self {
57            event_store,
58            executors: vec![],
59            shutdown_token: CancellationToken::new(),
60            intialize: true,
61        }
62    }
63
64    /// Marks the event listener as uninitialized, indicating that the database setup is already
65    /// done.
66    ///
67    /// This method sets the `initialize` flag to `false`. When the flag is unset, the listener will not
68    /// initialize the database. If you set `initialize` to `false`, you must ensure that the
69    /// database is initialized before running the listener. Check the SQL files in the `listener/sql` folder
70    /// to initialize the database.
71    ///
72    /// # Returns
73    ///
74    /// The updated `PgEventListener` instance with the `uninitialized` flag set.
75    pub fn uninitialized(mut self) -> Self {
76        self.intialize = false;
77        self
78    }
79
80    /// Registers an event listener to the `PgEventListener`.
81    ///
82    /// # Parameters
83    ///
84    /// * `event_listner`: An implementation of the `EventListener` trait for the specified event type `QE`.
85    /// * `config`: A `PgEventListenerConfig` instance representing the configuration for the event listener.
86    ///
87    /// # Returns
88    ///
89    /// The updated `PgEventListener` instance with the registered event handler.
90    pub fn register_listener<QE, L>(
91        mut self,
92        event_listener: L,
93        config: PgEventListenerConfig<impl Retry<L::Error> + Send + Sync + Clone + 'static>,
94    ) -> Self
95    where
96        L: EventListener<PgEventId, QE> + 'static,
97        QE: TryFrom<E> + Into<E> + Event + Send + Sync + Clone + 'static,
98        <QE as TryFrom<E>>::Error: StdError + Send + Sync,
99        L::Error: Send + Sync + 'static,
100    {
101        self.executors.push(Box::new(PgEventListerExecutor::new(
102            self.event_store.clone(),
103            event_listener,
104            self.shutdown_token.clone(),
105            config,
106        )));
107        self
108    }
109
110    /// Starts the listener process for all registered event listeners.
111    ///
112    /// # Returns
113    ///
114    /// A `Result` indicating the success or failure of the listener process.
115    pub async fn start(self) -> Result<(), Error> {
116        if self.intialize {
117            Migrator::new(self.event_store.clone())
118                .init_listener()
119                .await?;
120        }
121        let mut handles = vec![];
122        let mut wakers = vec![];
123        for executor in self.executors {
124            executor.init().await?;
125            let (waker, task) = executor.run();
126            if let Some(waker) = waker {
127                wakers.push(waker);
128            }
129            handles.push(task);
130        }
131        if !wakers.is_empty() {
132            let pool = self.event_store.pool.clone();
133            let shutdown = self.shutdown_token.clone();
134            let watch_new_events = tokio::spawn(async move {
135                loop {
136                    let mut listener = sqlx::postgres::PgListener::connect_with(&pool).await?;
137                    listener.listen("new_events").await?;
138                    loop {
139                        tokio::select! {
140                            msg = listener.try_recv() => {
141                                match msg {
142                                    Ok(Some(notification)) => {
143                                        for waker in &wakers {
144                                            waker.wake(notification.payload());
145                                        }
146                                    },
147                                    Ok(None) => {},
148                                    Err(err @ sqlx::Error::PoolClosed) => return Err(Error::Database(err)),
149                                    Err(_) => break,
150                                }
151                            }
152                            _ = shutdown.cancelled() => return Ok::<(), Error>(()),
153                        }
154                    }
155                }
156            });
157            handles.push(watch_new_events);
158        }
159        join_all(handles).await;
160        Ok(())
161    }
162
163    /// Starts the listener process for all the registered event listeners with a shutdown signal.
164    ///
165    /// # Parameters
166    ///
167    /// * `shutdown`: A future that represents the shutdown signal.
168    ///
169    /// # Returns
170    ///
171    /// A `Result` indicating the success or failure of the listener process.
172    pub async fn start_with_shutdown<F: Future<Output = ()> + Send + 'static>(
173        self,
174        shutdown: F,
175    ) -> Result<(), Error> {
176        let shutdown_token = self.shutdown_token.clone();
177        let shutdown_handle = async move {
178            shutdown.await;
179            shutdown_token.cancel();
180            Ok::<(), Error>(())
181        };
182        try_join!(self.start(), shutdown_handle).map(|_| ())
183    }
184}
185
186#[derive(Debug)]
187
188/// Represents the different error kinds that can occur in the event listener lifecycle.
189pub enum PgEventListenerErrorKind<HE> {
190    /// Error occurred while initializing the database transaction for the event listener.
191    InitTransaction { source: Error },
192    /// Error occurred while acquiring a lock for the event listener in the database.
193    AcquireLock { source: Error },
194    /// Error occurred while fetching the next event from the event store.
195    /// Contains the last successfully processed event ID.
196    FetchNextEvent {
197        source: Error,
198        last_processed_event_id: PgEventId,
199    },
200    /// Error occurred in the user-provided event handler.
201    /// Contains the last successfully processed event ID and the handler's error.
202    Handler {
203        source: HE,
204        last_processed_event_id: PgEventId,
205    },
206    /// Error occurred while releasing the lock or updating the last processed event ID in the database.
207    ReleaseLock {
208        source: Error,
209        last_processed_event_id: PgEventId,
210    },
211}
212
213#[derive(Debug)]
214/// Error type for the event listener, parameterized by the handler error type.
215pub struct PgEventListenerError<HE> {
216    pub kind: PgEventListenerErrorKind<HE>,
217    pub listener_id: String,
218}
219
220/// Decision returned by a retry policy for error handling.
221pub enum RetryAction {
222    /// Stop the event listener.
223    Abort,
224    /// Wait for the specified duration before retrying again.
225    Wait { duration: Duration },
226}
227
228/// Trait for implementing retry policies for event listener errors.
229pub trait Retry<HE> {
230    fn retry(&self, error: PgEventListenerError<HE>, attempts: usize) -> RetryAction;
231}
232
233/// A retry policy that always aborts on error.
234#[derive(Clone, Copy, Default)]
235pub struct AbortRetry;
236
237impl<HE> Retry<HE> for AbortRetry {
238    fn retry(&self, _error: PgEventListenerError<HE>, _attempts: usize) -> RetryAction {
239        RetryAction::Abort
240    }
241}
242
243impl<HE, T: Fn(PgEventListenerError<HE>, usize) -> RetryAction> Retry<HE> for T {
244    fn retry(&self, error: PgEventListenerError<HE>, attempts: usize) -> RetryAction {
245        self(error, attempts)
246    }
247}
248
249/// PostgreSQL listener Configuration.
250///
251/// # Properties:
252///
253/// * `poll`: The `poll` property represents the interval at which the
254///   listener should poll for new events from the event store. This determines how frequently the
255///   event handler will handles new events.
256/// * `notifier_enabled`: The `notifier_enabled` indicates if the listener is configured to handle events in "real time".
257pub struct PgEventListenerConfig<R> {
258    poll: Duration,
259    fetch_size: usize,
260    notifier_enabled: bool,
261    retry: R,
262}
263
264impl<R> Clone for PgEventListenerConfig<R>
265where
266    R: Clone,
267{
268    fn clone(&self) -> Self {
269        Self {
270            poll: self.poll,
271            fetch_size: self.fetch_size,
272            notifier_enabled: self.notifier_enabled,
273            retry: self.retry.clone(),
274        }
275    }
276}
277
278impl PgEventListenerConfig<AbortRetry> {
279    /// Creates a new `PgEventListenerConfig` with the specified poll interval.
280    ///
281    /// # Parameters
282    ///
283    /// * `poll`: The poll interval.
284    ///
285    /// # Returns
286    ///
287    /// A new `PgEventListenerConfig` instance.
288    pub fn poller(poll: Duration) -> PgEventListenerConfig<AbortRetry> {
289        PgEventListenerConfig {
290            poll,
291            fetch_size: usize::MAX,
292            notifier_enabled: false,
293            retry: AbortRetry,
294        }
295    }
296}
297
298impl<R> PgEventListenerConfig<R> {
299    /// Sets the fetch size for the event listener.
300    /// The fetch size determines the number of events to fetch from the event store at a time.
301    ///
302    /// # Parameters
303    ///
304    /// * `fetch_size`: The number of events to fetch from the event store at a time.
305    ///
306    /// # Returns
307    ///
308    /// A new `PgEventListenerConfig` instance.
309    pub fn fetch_size(mut self, fetch_size: usize) -> Self {
310        self.fetch_size = fetch_size;
311        self
312    }
313
314    /// Sets the db notifier.
315    ///
316    /// # Returns
317    ///
318    /// The updated `PgEventListenerConfig` instance with the db notifier set.
319    /// When the db notifier is enabled, the event listener will handle events in "real time".
320    pub fn with_notifier(mut self) -> Self {
321        self.notifier_enabled = true;
322        self
323    }
324
325    /// Sets the retry policy for the event listener.
326    ///
327    /// # Parameters
328    /// * `retry`: The retry policy to use.
329    ///
330    /// # Returns
331    /// The updated `PgEventListenerConfig` instance with the retry policy set.
332    pub fn with_retry<R1>(self, retry: R1) -> PgEventListenerConfig<R1> {
333        PgEventListenerConfig {
334            retry,
335            poll: self.poll,
336            fetch_size: self.fetch_size,
337            notifier_enabled: self.notifier_enabled,
338        }
339    }
340}
341
342/// Outcome of a listener execution step.
343enum ListenerExecutionControl {
344    /// Continue processing events in the listener loop.
345    Continue,
346    /// Stop the listener loop and terminate processing.
347    Stop,
348}
349
350#[async_trait]
351trait EventListenerExecutor<E: Event + Clone> {
352    async fn init(&self) -> Result<(), Error>;
353    fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>);
354}
355
356/// Executor for a registered event listener, handling polling, notification, and error management.
357struct PgEventListerExecutor<L, QE, E, S, R>
358where
359    QE: TryFrom<E> + Event + Send + Sync + Clone,
360    <QE as TryFrom<E>>::Error: Send + Sync,
361    E: Event + Clone + Sync + Send,
362    S: Serde<E> + Clone + Send + Sync,
363    L: EventListener<PgEventId, QE>,
364    R: Retry<L::Error>,
365    L::Error: Send + Sync + 'static,
366{
367    event_store: PgEventStore<E, S>,
368    event_handler: Arc<L>,
369    config: PgEventListenerConfig<R>,
370    wake_channel: (watch::Sender<bool>, watch::Receiver<bool>),
371    shutdown_token: CancellationToken,
372    _event_store_events: PhantomData<E>,
373    _event_listener_events: PhantomData<QE>,
374}
375
376impl<L, QE, E, S, R> PgEventListerExecutor<L, QE, E, S, R>
377where
378    E: Event + Clone + Sync + Send + 'static,
379    S: Serde<E> + Clone + Send + Sync + 'static,
380    QE: TryFrom<E> + Event + 'static + Send + Sync + Clone,
381    <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
382    L: EventListener<PgEventId, QE> + 'static,
383    R: Retry<L::Error> + Send + Sync + 'static,
384    L::Error: Send + Sync + 'static,
385{
386    /// Creates a new executor for the given event handler and configuration.
387    pub fn new(
388        event_store: PgEventStore<E, S>,
389        event_handler: L,
390        shutdown_token: CancellationToken,
391        config: PgEventListenerConfig<R>,
392    ) -> Self {
393        Self {
394            event_store,
395            event_handler: Arc::new(event_handler),
396            config,
397            wake_channel: watch::channel(true),
398            shutdown_token,
399            _event_store_events: PhantomData,
400            _event_listener_events: PhantomData,
401        }
402    }
403
404    /// Attempts to acquire a lock for this listener in the database, returning the last processed event ID if successful.
405    async fn acquire_listener(
406        &self,
407        tx: &mut Transaction<'_, Postgres>,
408    ) -> Result<Option<PgEventId>, sqlx::Error> {
409        Ok(sqlx::query("SELECT last_processed_event_id FROM event_listener WHERE id = $1 FOR UPDATE SKIP LOCKED")
410        .bind(self.event_handler.id())
411        .fetch_optional(&mut **tx)
412        .await?
413        .map(|r| r.get(0)))
414    }
415
416    /// Releases the lock for this listener and updates the last processed event ID in the database.
417    async fn release_listener(
418        &self,
419        result: Result<PgEventId, PgEventListenerError<L::Error>>,
420        mut tx: Transaction<'_, Postgres>,
421    ) -> Result<(), PgEventListenerError<L::Error>> {
422        let last_processed_event_id = match result {
423            Ok(last_processed_event_id) => last_processed_event_id,
424            Err(PgEventListenerError {
425                kind:
426                    PgEventListenerErrorKind::FetchNextEvent {
427                        last_processed_event_id,
428                        ..
429                    }
430                    | PgEventListenerErrorKind::Handler {
431                        last_processed_event_id,
432                        ..
433                    },
434                ..
435            }) => last_processed_event_id,
436            Err(e) => return Err(e),
437        };
438        sqlx::query(
439            "UPDATE event_listener SET last_processed_event_id = $1, updated_at = now() WHERE id = $2",
440        )
441        .bind(last_processed_event_id)
442        .bind(self.event_handler.id())
443        .execute(&mut *tx)
444        .await.map_err(|e| PgEventListenerError::<L::Error>{
445            kind: PgEventListenerErrorKind::ReleaseLock {
446                source: e.into(),
447                last_processed_event_id
448            },
449            listener_id: self.event_handler.id().to_string(),
450        })?;
451        tx.commit()
452            .await
453            .map_err(|e| PgEventListenerError::<L::Error> {
454                kind: PgEventListenerErrorKind::ReleaseLock {
455                    source: e.into(),
456                    last_processed_event_id,
457                },
458                listener_id: self.event_handler.id().to_string(),
459            })?;
460        result.map(|_| ())
461    }
462
463    /// Handles events from the event store, starting from the given event ID.
464    async fn handle_events_from(
465        &self,
466        mut last_processed_event_id: PgEventId,
467        tx: &mut Transaction<'_, Postgres>,
468    ) -> Result<PgEventId, PgEventListenerError<L::Error>> {
469        let query = self
470            .event_handler
471            .query()
472            .clone()
473            .change_origin(last_processed_event_id);
474
475        let mut stream = self
476            .event_store
477            .stream_with(&mut **tx, &query)
478            .take(self.config.fetch_size);
479
480        while let Some(item) = stream.next().await {
481            let item = item.map_err(|e| PgEventListenerError::<L::Error> {
482                kind: PgEventListenerErrorKind::FetchNextEvent {
483                    source: e,
484                    last_processed_event_id,
485                },
486                listener_id: self.event_handler.id().to_string(),
487            })?;
488
489            let event_id = item.id();
490
491            match item {
492                StreamItem::End(_) => {
493                    last_processed_event_id = event_id;
494                    break;
495                }
496                StreamItem::Event(event) => {
497                    self.event_handler
498                        .handle(event)
499                        .await
500                        .map_err(|e| PgEventListenerError {
501                            kind: PgEventListenerErrorKind::Handler {
502                                source: e,
503                                last_processed_event_id,
504                            },
505                            listener_id: self.event_handler.id().to_string(),
506                        })?;
507
508                    last_processed_event_id = event_id;
509                }
510            }
511            if self.shutdown_token.is_cancelled() {
512                break;
513            }
514        }
515
516        Ok(last_processed_event_id)
517    }
518
519    /// Attempts to execute the event handling logic once, with error mapping and lock management.
520    pub async fn try_execute(&self) -> Result<(), PgEventListenerError<L::Error>> {
521        let mut tx = self
522            .event_store
523            .pool
524            .begin()
525            .await
526            .map_err(|e| PgEventListenerError {
527                kind: PgEventListenerErrorKind::InitTransaction { source: e.into() },
528                listener_id: self.event_handler.id().to_string(),
529            })?;
530        let Some(last_processed_id) =
531            self.acquire_listener(&mut tx)
532                .await
533                .map_err(|e| PgEventListenerError {
534                    kind: PgEventListenerErrorKind::AcquireLock { source: e.into() },
535                    listener_id: self.event_handler.id().to_string(),
536                })?
537        else {
538            return Ok(()); // Another instance is processing
539        };
540
541        let result = self.handle_events_from(last_processed_id, &mut tx).await;
542
543        self.release_listener(result, tx).await
544    }
545
546    /// Executes the event handler with retry logic according to the configured policy.
547    async fn execute(&self) -> ListenerExecutionControl {
548        let mut attempts = 0;
549        loop {
550            match self.try_execute().await {
551                Ok(_) => break ListenerExecutionControl::Continue,
552                Err(err) => match self.config.retry.retry(err, attempts) {
553                    RetryAction::Abort => break ListenerExecutionControl::Stop,
554                    RetryAction::Wait { duration } => {
555                        attempts += 1;
556                        tokio::time::sleep(duration).await;
557                    }
558                },
559            }
560        }
561    }
562
563    /// Spawns the event handler as a background task, polling and/or waiting for notifications.
564    pub fn spawn_task(self) -> JoinHandle<Result<(), Error>> {
565        let shutdown = self.shutdown_token.clone();
566        let mut poll = tokio::time::interval(self.config.poll);
567        poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
568        let mut wake_tx = self.wake_channel.1.clone();
569        tokio::spawn(async move {
570            loop {
571                let outcome = tokio::select! {
572                    Ok(()) = wake_tx.changed() => self.execute().await,
573                    _ = poll.tick() => self.execute().await,
574                    _ = shutdown.cancelled() => return Ok::<(), Error>(()),
575                };
576                match outcome {
577                    ListenerExecutionControl::Continue => {}
578                    ListenerExecutionControl::Stop => break,
579                }
580            }
581            Ok(())
582        })
583    }
584}
585
586#[async_trait]
587impl<L, QE, E, S, R> EventListenerExecutor<E> for PgEventListerExecutor<L, QE, E, S, R>
588where
589    E: Event + Clone + Sync + Send + 'static,
590    S: Serde<E> + Clone + Send + Sync + 'static,
591    QE: TryFrom<E> + Into<E> + Event + 'static + Send + Sync + Clone,
592    <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
593    L: EventListener<PgEventId, QE> + 'static,
594    R: Retry<L::Error> + Clone + Send + Sync + 'static,
595    L::Error: Send + Sync + 'static,
596{
597    async fn init(&self) -> Result<(), Error> {
598        let mut tx = self.event_store.pool.begin().await?;
599        sqlx::query("INSERT INTO event_listener (id, last_processed_event_id) VALUES ($1, 0) ON CONFLICT (id) DO NOTHING")
600                .bind(self.event_handler.id())
601                .execute(&mut *tx)
602                .await?;
603        tx.commit().await?;
604        Ok(())
605    }
606
607    fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>) {
608        let waker = if self.config.notifier_enabled {
609            Some(ExecutorWaker {
610                wake_tx: self.wake_channel.0.clone(),
611                query: self.event_handler.query().cast().clone(),
612            })
613        } else {
614            None
615        };
616        (waker, self.clone().spawn_task())
617    }
618}
619
620impl<L, QE, E, S, R> Clone for PgEventListerExecutor<L, QE, E, S, R>
621where
622    QE: TryFrom<E> + Event + Send + Sync + Clone,
623    <QE as TryFrom<E>>::Error: Send + Sync,
624    E: Event + Clone + Sync + Send,
625    S: Serde<E> + Clone + Send + Sync,
626    L: EventListener<PgEventId, QE>,
627    R: Retry<L::Error> + Clone,
628    L::Error: Send + Sync + 'static,
629{
630    fn clone(&self) -> Self {
631        Self {
632            event_store: self.event_store.clone(),
633            event_handler: Arc::clone(&self.event_handler),
634            config: self.config.clone(),
635            wake_channel: self.wake_channel.clone(),
636            shutdown_token: self.shutdown_token.clone(),
637            _event_store_events: PhantomData,
638            _event_listener_events: PhantomData,
639        }
640    }
641}
642
643/// Waker for executor, used to notify about new events matching a query.
644struct ExecutorWaker<E: Event + Clone> {
645    wake_tx: watch::Sender<bool>,
646    query: StreamQuery<PgEventId, E>,
647}
648
649impl<E: Event + Clone> ExecutorWaker<E> {
650    /// Wakes the executor if the event matches the query.
651    fn wake(&self, event: &str) {
652        if self.query.matches_event(event) {
653            self.wake_tx.send_replace(true);
654        }
655    }
656}