disintegrate_postgres/listener/
id_indexer.rs

1//! An `EventListener` implementation for indexing existing fields tagged with `#[id]`.
2use std::{collections::BTreeMap, marker::PhantomData};
3
4use async_trait::async_trait;
5use disintegrate::{DomainIdentifierSet, Event, EventListener, PersistedEvent, StreamQuery};
6use sqlx::{PgPool, Postgres};
7
8use crate::PgEventId;
9
10/// The `PgIdIndexer` is a helper to index existing fields that have been newly tagged with the `#[id]` attribute in events.
11///
12/// # Overview
13///
14/// The `PgIdIndexer` is an `EventListener` responsible for indexing fields in the event store
15/// that are tagged with the `#[id]` attribute. This allows querying of old events based
16/// on this new domain identifier once the indexing is complete. The `PgIdIndexer` listens to events and updates the
17/// `event` table in the database with the appropriate values.
18///
19/// # Workflow
20///
21/// After you have tagged an existing field in your event structure with the `#[id]` attribute to mark it
22/// as a domain identifier:
23///
24///   ```rust
25///   use disintegrate_macros::Event;
26///   use serde::{Serialize, Deserialize};
27///   #[derive(Event, Clone, Serialize, Deserialize)]
28///   struct MyEvent {
29///       #[id]
30///       existing_id: String,
31///       other_field: String,
32///   }
33///   ```
34///
35/// 1. **Register the `PgIdIndexer` as an `EventListener`**: Integrate the indexer
36///    with the event listener system to process the newly tagged domain identifier:
37///
38///    ```rust
39///    use disintegrate_postgres::PgIdIndexer;
40///    use disintegrate_postgres::PgEventListenerConfig;
41///    use disintegrate_postgres::PgEventListener;
42///    use disintegrate_postgres::PgEventStore;
43///    use std::time::Duration;
44///    use disintegrate_macros::Event;
45///    use disintegrate::serde::json::Json;
46///    use serde::{Serialize, Deserialize};
47///    use sqlx::PgPool;
48///
49///    #[derive(Event, Clone, Serialize, Deserialize)]
50///    struct MyEvent {
51///        #[id]
52///        existing_id: String,
53///        other_field: String,
54///    }
55///
56///    async fn setup_listener(pool: PgPool, event_store: PgEventStore<MyEvent, Json<MyEvent>>) {
57///        let id_indexer = PgIdIndexer::<MyEvent>::new("index_exsting_id", pool);
58///        PgEventListener::builder(event_store)
59///            .register_listener(
60///                id_indexer,
61///                PgEventListenerConfig::poller(Duration::from_secs(5)).with_notifier()
62///            )
63///            .start_with_shutdown(shutdown())
64///            .await
65///            .expect("start event listener failed");
66///    }
67///
68///    async fn shutdown() {
69///        tokio::signal::ctrl_c().await.expect("ctrl_c signal failed");
70///    }
71///    ```
72///
73/// 2. **Deploy the application**: Start the application with the updated event
74///    structure and the `PgIdIndexer` integration. Newly created events with the new
75///    domain identifier will automatically have the identifier indexed.
76///
77/// Once the indexing process is complete, you can query the event store using the
78/// new domain identifier to fetch events.
79///
80/// If indexing is done, you can remove the `PgIdIndexer` from the list of registered event listeners.
81pub struct PgIdIndexer<E: Event + Clone> {
82    id: &'static str,
83    pool: PgPool,
84    query: StreamQuery<PgEventId, E>,
85    _event: PhantomData<E>,
86}
87
88impl<E: Event + Clone> PgIdIndexer<E> {
89    /// Creates a new `PgIdIndexer` instance for indexing events.
90    ///
91    /// # Arguments
92    ///
93    /// * `id` - A unique identifier for the listener, used to store the last processed `event_id` in the database.
94    /// * `pool` - A `PgPool` instance for Postgres.
95    pub fn new(id: &'static str, pool: PgPool) -> Self {
96        Self {
97            id,
98            pool,
99            query: disintegrate::query!(E),
100            _event: PhantomData,
101        }
102    }
103}
104
105/// PostgreSQL Id Indexer error.
106#[derive(thiserror::Error, Debug)]
107#[error(transparent)]
108pub struct Error(#[from] sqlx::Error);
109
110#[async_trait]
111impl<E: Event + Clone + Send + Sync> EventListener<PgEventId, E> for PgIdIndexer<E> {
112    type Error = Error;
113
114    fn id(&self) -> &'static str {
115        self.id
116    }
117
118    fn query(&self) -> &StreamQuery<PgEventId, E> {
119        &self.query
120    }
121
122    async fn handle(&self, event: PersistedEvent<PgEventId, E>) -> Result<(), Self::Error> {
123        let mut query_builder = sql_builder(event.id(), event.domain_identifiers());
124        query_builder.build().execute(&self.pool).await?;
125        Ok(())
126    }
127}
128
129fn sql_builder(
130    event_id: PgEventId,
131    domain_identifiers: DomainIdentifierSet,
132) -> sqlx::QueryBuilder<'static, Postgres> {
133    let domain_identifiers = <BTreeMap<_, _> as Clone>::clone(&domain_identifiers).into_iter();
134    let mut sql_builder = sqlx::QueryBuilder::new("UPDATE event SET ");
135    let mut separated = sql_builder.separated(",");
136    for (id_name, id_value) in domain_identifiers {
137        separated.push(format!("{id_name} = "));
138
139        match id_value {
140            disintegrate::IdentifierValue::String(value) => {
141                separated.push_bind_unseparated(value.clone())
142            }
143            disintegrate::IdentifierValue::i64(value) => separated.push_bind_unseparated(value),
144            disintegrate::IdentifierValue::Uuid(value) => separated.push_bind_unseparated(value),
145        };
146    }
147    separated.push_unseparated(" WHERE event_id = ");
148    separated.push_bind_unseparated(event_id);
149
150    sql_builder
151}
152
153#[cfg(test)]
154mod test {
155    use disintegrate::domain_identifiers;
156    use uuid::Uuid;
157
158    use super::sql_builder;
159
160    #[test]
161    fn it_builds_event_update() {
162        let ids =
163            domain_identifiers! {cart_id: "cart1", product_id: 1, customer_id: Uuid::new_v4()};
164
165        let builder = sql_builder(1, ids);
166
167        assert_eq!(
168            builder.sql(),
169            "UPDATE event SET cart_id = $1,customer_id = $2,product_id = $3 WHERE event_id = $4"
170        );
171    }
172}