disintegrate_postgres/
listener.rs

1//! PostgreSQL Event Listener
2//!
3//! This module provides an implementation of a PostgreSQL event listener.
4//! It allows listening events when they are persisted in the event store.
5//! It assures that the events are delivered at least once, so the implementation
6//! of the `EventListener` trait should handle duplicated events delivery in case of failures.
7#[cfg(test)]
8mod tests;
9
10pub(crate) mod id_indexer;
11
12use crate::{Error, PgEventId};
13use async_trait::async_trait;
14use disintegrate::{Event, EventListener, EventStore, StreamQuery};
15use disintegrate_serde::Serde;
16use futures::future::join_all;
17use futures::{try_join, Future, StreamExt};
18use sqlx::{PgPool, Postgres, Row, Transaction};
19use std::error::Error as StdError;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27use crate::event_store::PgEventStore;
28
29/// PostgreSQL event listener implementation.
30pub struct PgEventListener<E, S>
31where
32    E: Event + Clone,
33    S: Serde<E> + Send + Sync,
34{
35    executors: Vec<Box<dyn EventListenerExecutor<E> + Send + Sync>>,
36    event_store: PgEventStore<E, S>,
37    intialize: bool,
38    shutdown_token: CancellationToken,
39}
40
41impl<E, S> PgEventListener<E, S>
42where
43    E: Event + Clone + Send + Sync + 'static,
44    S: Serde<E> + Clone + Send + Sync + 'static,
45{
46    /// Creates a new `PgEventListener` that listens to the events coming from the provided `PgEventStore`
47    ///
48    /// # Parameters
49    ///
50    /// * `event_store`: An instance of `PgEventStore` representing the event store for the listener.
51    ///
52    /// # Returns
53    ///
54    /// A new `PgEventListener` instance.
55    pub fn builder(event_store: PgEventStore<E, S>) -> Self {
56        Self {
57            event_store,
58            executors: vec![],
59            shutdown_token: CancellationToken::new(),
60            intialize: true,
61        }
62    }
63
64    /// Marks the event listener as uninitialized, indicating that the database setup is already
65    /// done.
66    ///
67    /// This method sets the `initialize` flag to `false`. When the flag is unset, the listener will not
68    /// initialize the database. If you set `initialize` to `false`, you must ensure that the
69    /// database is initialized before running the listener. Check the SQL files in the `listener/sql` folder
70    /// to initialize the database.
71    ///
72    /// # Returns
73    ///
74    /// The updated `PgEventListener` instance with the `uninitialized` flag set.
75    pub fn uninitialized(mut self) -> Self {
76        self.intialize = false;
77        self
78    }
79
80    /// Registers an event listener to the `PgEventListener`.
81    ///
82    /// # Parameters
83    ///
84    /// * `event_listner`: An implementation of the `EventListener` trait for the specified event type `QE`.
85    /// * `config`: A `PgEventListenerConfig` instance representing the configuration for the event listener.
86    ///
87    /// # Returns
88    ///
89    /// The updated `PgEventListener` instance with the registered event handler.
90    pub fn register_listener<QE>(
91        mut self,
92        event_listener: impl EventListener<PgEventId, QE> + 'static,
93        config: PgEventListenerConfig,
94    ) -> Self
95    where
96        QE: TryFrom<E> + Into<E> + Event + Send + Sync + Clone + 'static,
97        <QE as TryFrom<E>>::Error: StdError + Send + Sync,
98    {
99        self.executors.push(Box::new(PgEventListerExecutor::new(
100            self.event_store.clone(),
101            event_listener,
102            self.shutdown_token.clone(),
103            config,
104        )));
105        self
106    }
107
108    /// Starts the listener process for all registered event listeners.
109    ///
110    /// # Returns
111    ///
112    /// A `Result` indicating the success or failure of the listener process.
113    pub async fn start(self) -> Result<(), Error> {
114        if self.intialize {
115            setup(&self.event_store.pool).await?;
116        }
117        let mut handles = vec![];
118        let mut wakers = vec![];
119        for executor in self.executors {
120            executor.init().await?;
121            let (waker, task) = executor.run();
122            if let Some(waker) = waker {
123                wakers.push(waker);
124            }
125            handles.push(task);
126        }
127        if !wakers.is_empty() {
128            let pool = self.event_store.pool.clone();
129            let shutdown = self.shutdown_token.clone();
130            let watch_new_events = tokio::spawn(async move {
131                loop {
132                    let mut listener = sqlx::postgres::PgListener::connect_with(&pool).await?;
133                    listener.listen("new_events").await?;
134                    loop {
135                        tokio::select! {
136                            msg = listener.try_recv() => {
137                                match msg {
138                                    Ok(Some(notification)) => {
139                                        for waker in &wakers {
140                                            waker.wake(notification.payload());
141                                        }
142                                    },
143                                    Ok(None) => {},
144                                    Err(err @ sqlx::Error::PoolClosed) => return Err(Error::Database(err)),
145                                    Err(_) => break,
146                                }
147                            }
148                            _ = shutdown.cancelled() => return Ok::<(), Error>(()),
149                        }
150                    }
151                }
152            });
153            handles.push(watch_new_events);
154        }
155        join_all(handles).await;
156        Ok(())
157    }
158
159    /// Starts the listener process for all the registered event listeners with a shutdown signal.
160    ///
161    /// # Parameters
162    ///
163    /// * `shutdown`: A future that represents the shutdown signal.
164    ///
165    /// # Returns
166    ///
167    /// A `Result` indicating the success or failure of the listener process.
168    pub async fn start_with_shutdown<F: Future<Output = ()> + Send + 'static>(
169        self,
170        shutdown: F,
171    ) -> Result<(), Error> {
172        let shutdown_token = self.shutdown_token.clone();
173        let shutdown_handle = async move {
174            shutdown.await;
175            shutdown_token.cancel();
176            Ok::<(), Error>(())
177        };
178        try_join!(self.start(), shutdown_handle).map(|_| ())
179    }
180}
181
182#[derive(Debug)]
183pub struct PgEventListenerError {
184    last_processed_event_id: PgEventId,
185}
186
187/// PostgreSQL listener Configuration.
188///
189/// # Properties:
190///
191/// * `poll`: The `poll` property represents the interval at which the
192///   listener should poll for new events from the event store. This determines how frequently the
193///   event handler will handles new events.
194/// * `notifier_enabled`: The `notifier_enabled` indicates if the listener is configured to handle events in "real time".
195#[derive(Clone)]
196pub struct PgEventListenerConfig {
197    poll: Duration,
198    fetch_size: usize,
199    notifier_enabled: bool,
200}
201
202impl PgEventListenerConfig {
203    /// Creates a new `PgEventListenerConfig` with the specified poll interval.
204    ///
205    /// # Parameters
206    ///
207    /// * `poll`: The poll interval.
208    ///
209    /// # Returns
210    ///
211    /// A new `PgEventListenerConfig` instance.
212    pub fn poller(poll: Duration) -> Self {
213        Self {
214            poll,
215            fetch_size: usize::MAX,
216            notifier_enabled: false,
217        }
218    }
219
220    /// Sets the fetch size for the event listener.
221    /// The fetch size determines the number of events to fetch from the event store at a time.
222    ///
223    /// # Parameters
224    ///
225    /// * `fetch_size`: The number of events to fetch from the event store at a time.
226    ///
227    /// # Returns
228    ///
229    /// A new `PgEventListenerConfig` instance.
230    pub fn fetch_size(mut self, fetch_size: usize) -> Self {
231        self.fetch_size = fetch_size;
232        self
233    }
234
235    /// Sets the db notifier.
236    ///
237    /// # Returns
238    ///
239    /// The updated `PgEventListenerConfig` instance with the db notifier set.
240    /// When the db notifier is enabled, the event listener will handle events in "real time".
241    pub fn with_notifier(mut self) -> Self {
242        self.notifier_enabled = true;
243        self
244    }
245}
246
247#[async_trait]
248trait EventListenerExecutor<E: Event + Clone> {
249    async fn init(&self) -> Result<(), Error>;
250    fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>);
251}
252
253struct PgEventListerExecutor<L, QE, E, S>
254where
255    QE: TryFrom<E> + Event + Send + Sync + Clone,
256    <QE as TryFrom<E>>::Error: Send + Sync,
257    E: Event + Clone + Sync + Send,
258    S: Serde<E> + Clone + Send + Sync,
259    L: EventListener<PgEventId, QE>,
260{
261    event_store: PgEventStore<E, S>,
262    event_handler: Arc<L>,
263    config: PgEventListenerConfig,
264    wake_channel: (watch::Sender<bool>, watch::Receiver<bool>),
265    shutdown_token: CancellationToken,
266    _event_store_events: PhantomData<E>,
267    _event_listener_events: PhantomData<QE>,
268}
269
270impl<L, QE, E, S> PgEventListerExecutor<L, QE, E, S>
271where
272    E: Event + Clone + Sync + Send + 'static,
273    S: Serde<E> + Clone + Send + Sync + 'static,
274    QE: TryFrom<E> + Event + 'static + Send + Sync + Clone,
275    <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
276    L: EventListener<PgEventId, QE> + 'static,
277{
278    pub fn new(
279        event_store: PgEventStore<E, S>,
280        event_handler: L,
281        shutdown_token: CancellationToken,
282        config: PgEventListenerConfig,
283    ) -> Self {
284        Self {
285            event_store,
286            event_handler: Arc::new(event_handler),
287            config,
288            wake_channel: watch::channel(true),
289            shutdown_token,
290            _event_store_events: PhantomData,
291            _event_listener_events: PhantomData,
292        }
293    }
294
295    async fn lock_event_listener(
296        &self,
297        tx: &mut Transaction<'_, Postgres>,
298    ) -> Result<Option<PgEventId>, sqlx::Error> {
299        Ok(sqlx::query(
300            r#"
301                SELECT last_processed_event_id 
302                FROM event_listener
303                WHERE id = $1  
304                FOR UPDATE SKIP LOCKED 
305                "#,
306        )
307        .bind(self.event_handler.id())
308        .fetch_optional(&mut **tx)
309        .await?
310        .map(|r| r.get(0)))
311    }
312
313    async fn release_event_listener(
314        &self,
315        result: Result<PgEventId, PgEventListenerError>,
316        mut tx: Transaction<'_, Postgres>,
317    ) -> Result<(), sqlx::Error> {
318        let last_processed_event_id = match result {
319            Ok(last_processed_event_id) => last_processed_event_id,
320            Err(PgEventListenerError {
321                last_processed_event_id,
322            }) => last_processed_event_id,
323        };
324        sqlx::query(
325            "UPDATE event_listener SET last_processed_event_id = $1, updated_at = now() WHERE id = $2",
326        )
327        .bind(last_processed_event_id)
328        .bind(self.event_handler.id())
329        .execute(&mut *tx)
330        .await?;
331        tx.commit().await
332    }
333
334    pub async fn handle_events_from(
335        &self,
336        mut last_processed_event_id: PgEventId,
337    ) -> Result<PgEventId, PgEventListenerError> {
338        let query = self
339            .event_handler
340            .query()
341            .clone()
342            .change_origin(last_processed_event_id);
343        let mut events_stream = self.event_store.stream(&query).take(self.config.fetch_size);
344
345        while let Some(event) = events_stream.next().await {
346            let event = event.map_err(|_err| PgEventListenerError {
347                last_processed_event_id,
348            })?;
349            let event_id = event.id();
350            match self.event_handler.handle(event).await {
351                Ok(_) => last_processed_event_id = event_id,
352                Err(_) => {
353                    return Err(PgEventListenerError {
354                        last_processed_event_id,
355                    })
356                }
357            }
358            if self.shutdown_token.is_cancelled() {
359                break;
360            }
361        }
362
363        Ok(last_processed_event_id)
364    }
365
366    pub async fn try_execute(&self) -> Result<(), sqlx::Error> {
367        let mut tx = self.event_store.pool.begin().await?;
368        let Some(last_processed_id) = self.lock_event_listener(&mut tx).await? else {
369            return Ok(());
370        };
371        let result = self.handle_events_from(last_processed_id).await;
372        self.release_event_listener(result, tx).await
373    }
374
375    async fn execute(&self) -> Result<(), Error> {
376        let result = self.try_execute().await;
377        match result {
378            Err(sqlx::Error::Io(_)) | Err(sqlx::Error::PoolTimedOut) => Ok(()),
379            Err(err) => Err(Error::Database(err)),
380            _ => Ok(()),
381        }
382    }
383
384    pub fn spawn_task(self) -> JoinHandle<Result<(), Error>> {
385        let shutdown = self.shutdown_token.clone();
386        let mut poll = tokio::time::interval(self.config.poll);
387        poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
388        let mut wake_tx = self.wake_channel.1.clone();
389        tokio::spawn(async move {
390            loop {
391                tokio::select! {
392                    Ok(()) =  wake_tx.changed() => self.execute().await?,
393                    _ = poll.tick() => self.execute().await?,
394                    _ = shutdown.cancelled() => return Ok::<(), Error>(()),
395                };
396            }
397        })
398    }
399}
400
401#[async_trait]
402impl<L, QE, E, S> EventListenerExecutor<E> for PgEventListerExecutor<L, QE, E, S>
403where
404    E: Event + Clone + Sync + Send + 'static,
405    S: Serde<E> + Clone + Send + Sync + 'static,
406    QE: TryFrom<E> + Into<E> + Event + 'static + Send + Sync + Clone,
407    <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
408    L: EventListener<PgEventId, QE> + 'static,
409{
410    async fn init(&self) -> Result<(), Error> {
411        let mut tx = self.event_store.pool.begin().await?;
412        sqlx::query("INSERT INTO event_listener (id, last_processed_event_id) VALUES ($1, 0) ON CONFLICT (id) DO NOTHING")
413                .bind(self.event_handler.id())
414                .execute(&mut *tx)
415                .await?;
416        tx.commit().await?;
417        Ok(())
418    }
419
420    fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>) {
421        let waker = if self.config.notifier_enabled {
422            Some(ExecutorWaker {
423                wake_tx: self.wake_channel.0.clone(),
424                query: self.event_handler.query().cast().clone(),
425            })
426        } else {
427            None
428        };
429        (waker, self.clone().spawn_task())
430    }
431}
432
433impl<L, QE, E, S> Clone for PgEventListerExecutor<L, QE, E, S>
434where
435    QE: TryFrom<E> + Event + Send + Sync + Clone,
436    <QE as TryFrom<E>>::Error: Send + Sync,
437    E: Event + Clone + Sync + Send,
438    S: Serde<E> + Clone + Send + Sync,
439    L: EventListener<PgEventId, QE>,
440{
441    fn clone(&self) -> Self {
442        Self {
443            event_store: self.event_store.clone(),
444            event_handler: Arc::clone(&self.event_handler),
445            config: self.config.clone(),
446            wake_channel: self.wake_channel.clone(),
447            shutdown_token: self.shutdown_token.clone(),
448            _event_store_events: PhantomData,
449            _event_listener_events: PhantomData,
450        }
451    }
452}
453
454struct ExecutorWaker<E: Event + Clone> {
455    wake_tx: watch::Sender<bool>,
456    query: StreamQuery<PgEventId, E>,
457}
458
459impl<E: Event + Clone> ExecutorWaker<E> {
460    fn wake(&self, event: &str) {
461        if self.query.matches_event(event) {
462            self.wake_tx.send_replace(true);
463        }
464    }
465}
466
467async fn setup(pool: &PgPool) -> Result<(), Error> {
468    sqlx::query(include_str!("listener/sql/table_event_listener.sql"))
469        .execute(pool)
470        .await?;
471    sqlx::query(include_str!("listener/sql/fn_notify_event_listener.sql"))
472        .execute(pool)
473        .await?;
474    sqlx::query(include_str!(
475        "listener/sql/trigger_notify_event_listener.sql"
476    ))
477    .execute(pool)
478    .await?;
479    Ok(())
480}