1#[cfg(test)]
8mod tests;
9
10pub(crate) mod id_indexer;
11
12use crate::{Error, Migrator, PgEventId};
13use async_trait::async_trait;
14use disintegrate::{Event, EventListener, StreamItem, StreamQuery};
15use disintegrate_serde::Serde;
16use futures::future::join_all;
17use futures::{try_join, Future, StreamExt};
18use sqlx::{Postgres, Row, Transaction};
19use std::error::Error as StdError;
20use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::watch;
24use tokio::task::JoinHandle;
25use tokio_util::sync::CancellationToken;
26
27use crate::event_store::PgEventStore;
28
29pub struct PgEventListener<E, S>
31where
32 E: Event + Clone,
33 S: Serde<E> + Send + Sync,
34{
35 executors: Vec<Box<dyn EventListenerExecutor<E> + Send + Sync>>,
36 event_store: PgEventStore<E, S>,
37 intialize: bool,
38 shutdown_token: CancellationToken,
39}
40
41impl<E, S> PgEventListener<E, S>
42where
43 E: Event + Clone + Send + Sync + 'static,
44 S: Serde<E> + Clone + Send + Sync + 'static,
45{
46 pub fn builder(event_store: PgEventStore<E, S>) -> Self {
56 Self {
57 event_store,
58 executors: vec![],
59 shutdown_token: CancellationToken::new(),
60 intialize: true,
61 }
62 }
63
64 pub fn uninitialized(mut self) -> Self {
76 self.intialize = false;
77 self
78 }
79
80 pub fn register_listener<QE, L>(
91 mut self,
92 event_listener: L,
93 config: PgEventListenerConfig<impl Retry<L::Error> + Send + Sync + Clone + 'static>,
94 ) -> Self
95 where
96 L: EventListener<PgEventId, QE> + 'static,
97 QE: TryFrom<E> + Into<E> + Event + Send + Sync + Clone + 'static,
98 <QE as TryFrom<E>>::Error: StdError + Send + Sync,
99 L::Error: Send + Sync + 'static,
100 {
101 self.executors.push(Box::new(PgEventListerExecutor::new(
102 self.event_store.clone(),
103 event_listener,
104 self.shutdown_token.clone(),
105 config,
106 )));
107 self
108 }
109
110 pub async fn start(self) -> Result<(), Error> {
116 if self.intialize {
117 Migrator::new(self.event_store.clone())
118 .init_listener()
119 .await?;
120 }
121 let mut handles = vec![];
122 let mut wakers = vec![];
123 for executor in self.executors {
124 executor.init().await?;
125 let (waker, task) = executor.run();
126 if let Some(waker) = waker {
127 wakers.push(waker);
128 }
129 handles.push(task);
130 }
131 if !wakers.is_empty() {
132 let pool = self.event_store.pool.clone();
133 let shutdown = self.shutdown_token.clone();
134 let watch_new_events = tokio::spawn(async move {
135 loop {
136 let mut listener = sqlx::postgres::PgListener::connect_with(&pool).await?;
137 listener.listen("new_events").await?;
138 loop {
139 tokio::select! {
140 msg = listener.try_recv() => {
141 match msg {
142 Ok(Some(notification)) => {
143 for waker in &wakers {
144 waker.wake(notification.payload());
145 }
146 },
147 Ok(None) => {},
148 Err(err @ sqlx::Error::PoolClosed) => return Err(Error::Database(err)),
149 Err(_) => break,
150 }
151 }
152 _ = shutdown.cancelled() => return Ok::<(), Error>(()),
153 }
154 }
155 }
156 });
157 handles.push(watch_new_events);
158 }
159 join_all(handles).await;
160 Ok(())
161 }
162
163 pub async fn start_with_shutdown<F: Future<Output = ()> + Send + 'static>(
173 self,
174 shutdown: F,
175 ) -> Result<(), Error> {
176 let shutdown_token = self.shutdown_token.clone();
177 let shutdown_handle = async move {
178 shutdown.await;
179 shutdown_token.cancel();
180 Ok::<(), Error>(())
181 };
182 try_join!(self.start(), shutdown_handle).map(|_| ())
183 }
184}
185
186#[derive(Debug)]
187
188pub enum PgEventListenerErrorKind<HE> {
190 InitTransaction { source: Error },
192 AcquireLock { source: Error },
194 FetchNextEvent {
197 source: Error,
198 last_processed_event_id: PgEventId,
199 },
200 Handler {
203 source: HE,
204 last_processed_event_id: PgEventId,
205 },
206 ReleaseLock {
208 source: Error,
209 last_processed_event_id: PgEventId,
210 },
211}
212
213#[derive(Debug)]
214pub struct PgEventListenerError<HE> {
216 pub kind: PgEventListenerErrorKind<HE>,
217 pub listener_id: String,
218}
219
220pub enum RetryAction {
222 Abort,
224 Wait { duration: Duration },
226}
227
228pub trait Retry<HE> {
230 fn retry(&self, error: PgEventListenerError<HE>, attempts: usize) -> RetryAction;
231}
232
233#[derive(Clone, Copy, Default)]
235pub struct AbortRetry;
236
237impl<HE> Retry<HE> for AbortRetry {
238 fn retry(&self, _error: PgEventListenerError<HE>, _attempts: usize) -> RetryAction {
239 RetryAction::Abort
240 }
241}
242
243impl<HE, T: Fn(PgEventListenerError<HE>, usize) -> RetryAction> Retry<HE> for T {
244 fn retry(&self, error: PgEventListenerError<HE>, attempts: usize) -> RetryAction {
245 self(error, attempts)
246 }
247}
248
249pub struct PgEventListenerConfig<R> {
258 poll: Duration,
259 fetch_size: usize,
260 notifier_enabled: bool,
261 retry: R,
262}
263
264impl<R> Clone for PgEventListenerConfig<R>
265where
266 R: Clone,
267{
268 fn clone(&self) -> Self {
269 Self {
270 poll: self.poll,
271 fetch_size: self.fetch_size,
272 notifier_enabled: self.notifier_enabled,
273 retry: self.retry.clone(),
274 }
275 }
276}
277
278impl PgEventListenerConfig<AbortRetry> {
279 pub fn poller(poll: Duration) -> PgEventListenerConfig<AbortRetry> {
289 PgEventListenerConfig {
290 poll,
291 fetch_size: usize::MAX,
292 notifier_enabled: false,
293 retry: AbortRetry,
294 }
295 }
296}
297
298impl<R> PgEventListenerConfig<R> {
299 pub fn fetch_size(mut self, fetch_size: usize) -> Self {
310 self.fetch_size = fetch_size;
311 self
312 }
313
314 pub fn with_notifier(mut self) -> Self {
321 self.notifier_enabled = true;
322 self
323 }
324
325 pub fn with_retry<R1>(self, retry: R1) -> PgEventListenerConfig<R1> {
333 PgEventListenerConfig {
334 retry,
335 poll: self.poll,
336 fetch_size: self.fetch_size,
337 notifier_enabled: self.notifier_enabled,
338 }
339 }
340}
341
342enum ListenerExecutionControl {
344 Continue,
346 Stop,
348}
349
350#[async_trait]
351trait EventListenerExecutor<E: Event + Clone> {
352 async fn init(&self) -> Result<(), Error>;
353 fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>);
354}
355
356struct PgEventListerExecutor<L, QE, E, S, R>
358where
359 QE: TryFrom<E> + Event + Send + Sync + Clone,
360 <QE as TryFrom<E>>::Error: Send + Sync,
361 E: Event + Clone + Sync + Send,
362 S: Serde<E> + Clone + Send + Sync,
363 L: EventListener<PgEventId, QE>,
364 R: Retry<L::Error>,
365 L::Error: Send + Sync + 'static,
366{
367 event_store: PgEventStore<E, S>,
368 event_handler: Arc<L>,
369 config: PgEventListenerConfig<R>,
370 wake_channel: (watch::Sender<bool>, watch::Receiver<bool>),
371 shutdown_token: CancellationToken,
372 _event_store_events: PhantomData<E>,
373 _event_listener_events: PhantomData<QE>,
374}
375
376impl<L, QE, E, S, R> PgEventListerExecutor<L, QE, E, S, R>
377where
378 E: Event + Clone + Sync + Send + 'static,
379 S: Serde<E> + Clone + Send + Sync + 'static,
380 QE: TryFrom<E> + Event + 'static + Send + Sync + Clone,
381 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
382 L: EventListener<PgEventId, QE> + 'static,
383 R: Retry<L::Error> + Send + Sync + 'static,
384 L::Error: Send + Sync + 'static,
385{
386 pub fn new(
388 event_store: PgEventStore<E, S>,
389 event_handler: L,
390 shutdown_token: CancellationToken,
391 config: PgEventListenerConfig<R>,
392 ) -> Self {
393 Self {
394 event_store,
395 event_handler: Arc::new(event_handler),
396 config,
397 wake_channel: watch::channel(true),
398 shutdown_token,
399 _event_store_events: PhantomData,
400 _event_listener_events: PhantomData,
401 }
402 }
403
404 async fn acquire_listener(
406 &self,
407 tx: &mut Transaction<'_, Postgres>,
408 ) -> Result<Option<PgEventId>, sqlx::Error> {
409 Ok(sqlx::query("SELECT last_processed_event_id FROM event_listener WHERE id = $1 FOR UPDATE SKIP LOCKED")
410 .bind(self.event_handler.id())
411 .fetch_optional(&mut **tx)
412 .await?
413 .map(|r| r.get(0)))
414 }
415
416 async fn release_listener(
418 &self,
419 result: Result<PgEventId, PgEventListenerError<L::Error>>,
420 mut tx: Transaction<'_, Postgres>,
421 ) -> Result<(), PgEventListenerError<L::Error>> {
422 let last_processed_event_id = match result {
423 Ok(last_processed_event_id) => last_processed_event_id,
424 Err(PgEventListenerError {
425 kind:
426 PgEventListenerErrorKind::FetchNextEvent {
427 last_processed_event_id,
428 ..
429 }
430 | PgEventListenerErrorKind::Handler {
431 last_processed_event_id,
432 ..
433 },
434 ..
435 }) => last_processed_event_id,
436 Err(e) => return Err(e),
437 };
438 sqlx::query(
439 "UPDATE event_listener SET last_processed_event_id = $1, updated_at = now() WHERE id = $2",
440 )
441 .bind(last_processed_event_id)
442 .bind(self.event_handler.id())
443 .execute(&mut *tx)
444 .await.map_err(|e| PgEventListenerError::<L::Error>{
445 kind: PgEventListenerErrorKind::ReleaseLock {
446 source: e.into(),
447 last_processed_event_id
448 },
449 listener_id: self.event_handler.id().to_string(),
450 })?;
451 tx.commit()
452 .await
453 .map_err(|e| PgEventListenerError::<L::Error> {
454 kind: PgEventListenerErrorKind::ReleaseLock {
455 source: e.into(),
456 last_processed_event_id,
457 },
458 listener_id: self.event_handler.id().to_string(),
459 })?;
460 result.map(|_| ())
461 }
462
463 async fn handle_events_from(
465 &self,
466 mut last_processed_event_id: PgEventId,
467 tx: &mut Transaction<'_, Postgres>,
468 ) -> Result<PgEventId, PgEventListenerError<L::Error>> {
469 let query = self
470 .event_handler
471 .query()
472 .clone()
473 .change_origin(last_processed_event_id);
474
475 let mut stream = self
476 .event_store
477 .stream_with(&mut **tx, &query)
478 .take(self.config.fetch_size);
479
480 while let Some(item) = stream.next().await {
481 let item = item.map_err(|e| PgEventListenerError::<L::Error> {
482 kind: PgEventListenerErrorKind::FetchNextEvent {
483 source: e,
484 last_processed_event_id,
485 },
486 listener_id: self.event_handler.id().to_string(),
487 })?;
488
489 let event_id = item.id();
490
491 match item {
492 StreamItem::End(_) => {
493 last_processed_event_id = event_id;
494 break;
495 }
496 StreamItem::Event(event) => {
497 self.event_handler
498 .handle(event)
499 .await
500 .map_err(|e| PgEventListenerError {
501 kind: PgEventListenerErrorKind::Handler {
502 source: e,
503 last_processed_event_id,
504 },
505 listener_id: self.event_handler.id().to_string(),
506 })?;
507
508 last_processed_event_id = event_id;
509 }
510 }
511 if self.shutdown_token.is_cancelled() {
512 break;
513 }
514 }
515
516 Ok(last_processed_event_id)
517 }
518
519 pub async fn try_execute(&self) -> Result<(), PgEventListenerError<L::Error>> {
521 let mut tx = self
522 .event_store
523 .pool
524 .begin()
525 .await
526 .map_err(|e| PgEventListenerError {
527 kind: PgEventListenerErrorKind::InitTransaction { source: e.into() },
528 listener_id: self.event_handler.id().to_string(),
529 })?;
530 let Some(last_processed_id) =
531 self.acquire_listener(&mut tx)
532 .await
533 .map_err(|e| PgEventListenerError {
534 kind: PgEventListenerErrorKind::AcquireLock { source: e.into() },
535 listener_id: self.event_handler.id().to_string(),
536 })?
537 else {
538 return Ok(()); };
540
541 let result = self.handle_events_from(last_processed_id, &mut tx).await;
542
543 self.release_listener(result, tx).await
544 }
545
546 async fn execute(&self) -> ListenerExecutionControl {
548 let mut attempts = 0;
549 loop {
550 match self.try_execute().await {
551 Ok(_) => break ListenerExecutionControl::Continue,
552 Err(err) => match self.config.retry.retry(err, attempts) {
553 RetryAction::Abort => break ListenerExecutionControl::Stop,
554 RetryAction::Wait { duration } => {
555 attempts += 1;
556 tokio::time::sleep(duration).await;
557 }
558 },
559 }
560 }
561 }
562
563 pub fn spawn_task(self) -> JoinHandle<Result<(), Error>> {
565 let shutdown = self.shutdown_token.clone();
566 let mut poll = tokio::time::interval(self.config.poll);
567 poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
568 let mut wake_tx = self.wake_channel.1.clone();
569 tokio::spawn(async move {
570 loop {
571 let outcome = tokio::select! {
572 Ok(()) = wake_tx.changed() => self.execute().await,
573 _ = poll.tick() => self.execute().await,
574 _ = shutdown.cancelled() => return Ok::<(), Error>(()),
575 };
576 match outcome {
577 ListenerExecutionControl::Continue => {}
578 ListenerExecutionControl::Stop => break,
579 }
580 }
581 Ok(())
582 })
583 }
584}
585
586#[async_trait]
587impl<L, QE, E, S, R> EventListenerExecutor<E> for PgEventListerExecutor<L, QE, E, S, R>
588where
589 E: Event + Clone + Sync + Send + 'static,
590 S: Serde<E> + Clone + Send + Sync + 'static,
591 QE: TryFrom<E> + Into<E> + Event + 'static + Send + Sync + Clone,
592 <QE as TryFrom<E>>::Error: StdError + 'static + Send + Sync,
593 L: EventListener<PgEventId, QE> + 'static,
594 R: Retry<L::Error> + Clone + Send + Sync + 'static,
595 L::Error: Send + Sync + 'static,
596{
597 async fn init(&self) -> Result<(), Error> {
598 let mut tx = self.event_store.pool.begin().await?;
599 sqlx::query("INSERT INTO event_listener (id, last_processed_event_id) VALUES ($1, 0) ON CONFLICT (id) DO NOTHING")
600 .bind(self.event_handler.id())
601 .execute(&mut *tx)
602 .await?;
603 tx.commit().await?;
604 Ok(())
605 }
606
607 fn run(&self) -> (Option<ExecutorWaker<E>>, JoinHandle<Result<(), Error>>) {
608 let waker = if self.config.notifier_enabled {
609 Some(ExecutorWaker {
610 wake_tx: self.wake_channel.0.clone(),
611 query: self.event_handler.query().cast().clone(),
612 })
613 } else {
614 None
615 };
616 (waker, self.clone().spawn_task())
617 }
618}
619
620impl<L, QE, E, S, R> Clone for PgEventListerExecutor<L, QE, E, S, R>
621where
622 QE: TryFrom<E> + Event + Send + Sync + Clone,
623 <QE as TryFrom<E>>::Error: Send + Sync,
624 E: Event + Clone + Sync + Send,
625 S: Serde<E> + Clone + Send + Sync,
626 L: EventListener<PgEventId, QE>,
627 R: Retry<L::Error> + Clone,
628 L::Error: Send + Sync + 'static,
629{
630 fn clone(&self) -> Self {
631 Self {
632 event_store: self.event_store.clone(),
633 event_handler: Arc::clone(&self.event_handler),
634 config: self.config.clone(),
635 wake_channel: self.wake_channel.clone(),
636 shutdown_token: self.shutdown_token.clone(),
637 _event_store_events: PhantomData,
638 _event_listener_events: PhantomData,
639 }
640 }
641}
642
643struct ExecutorWaker<E: Event + Clone> {
645 wake_tx: watch::Sender<bool>,
646 query: StreamQuery<PgEventId, E>,
647}
648
649impl<E: Event + Clone> ExecutorWaker<E> {
650 fn wake(&self, event: &str) {
652 if self.query.matches_event(event) {
653 self.wake_tx.send_replace(true);
654 }
655 }
656}