disintegrate_postgres/listener/id_indexer.rs
1//! An `EventListener` implementation for indexing existing fields tagged with `#[id]`.
2use std::{collections::BTreeMap, marker::PhantomData};
3
4use async_trait::async_trait;
5use disintegrate::{DomainIdentifierSet, Event, EventListener, PersistedEvent, StreamQuery};
6use sqlx::{PgPool, Postgres};
7
8use crate::PgEventId;
9
10/// The `PgIdIndexer` is a helper to index existing fields that have been newly tagged with the `#[id]` attribute in events.
11///
12/// # Overview
13///
14/// The `PgIdIndexer` is an `EventListener` responsible for indexing fields in the event store
15/// that are tagged with the `#[id]` attribute. This allows querying of old events based
16/// on this new domain identifier once the indexing is complete. The `PgIdIndexer` listens to events and updates the
17/// `event` table in the database with the appropriate values.
18///
19/// # Workflow
20///
21/// After you have tagged an existing field in your event structure with the `#[id]` attribute to mark it
22/// as a domain identifier:
23///
24/// ```rust
25/// use disintegrate_macros::Event;
26/// use serde::{Serialize, Deserialize};
27/// #[derive(Event, Clone, Serialize, Deserialize)]
28/// struct MyEvent {
29/// #[id]
30/// existing_id: String,
31/// other_field: String,
32/// }
33/// ```
34///
35/// 1. **Register the `PgIdIndexer` as an `EventListener`**: Integrate the indexer
36/// with the event listener system to process the newly tagged domain identifier:
37///
38/// ```rust
39/// use disintegrate_postgres::PgIdIndexer;
40/// use disintegrate_postgres::PgEventListenerConfig;
41/// use disintegrate_postgres::PgEventListener;
42/// use disintegrate_postgres::PgEventStore;
43/// use std::time::Duration;
44/// use disintegrate_macros::Event;
45/// use disintegrate::serde::json::Json;
46/// use serde::{Serialize, Deserialize};
47/// use sqlx::PgPool;
48///
49/// #[derive(Event, Clone, Serialize, Deserialize)]
50/// struct MyEvent {
51/// #[id]
52/// existing_id: String,
53/// other_field: String,
54/// }
55///
56/// async fn setup_listener(pool: PgPool, event_store: PgEventStore<MyEvent, Json<MyEvent>>) {
57/// let id_indexer = PgIdIndexer::<MyEvent>::new("index_exsting_id", pool);
58/// PgEventListener::builder(event_store)
59/// .register_listener(
60/// id_indexer,
61/// PgEventListenerConfig::poller(Duration::from_secs(5)).with_notifier()
62/// )
63/// .start_with_shutdown(shutdown())
64/// .await
65/// .expect("start event listener failed");
66/// }
67///
68/// async fn shutdown() {
69/// tokio::signal::ctrl_c().await.expect("ctrl_c signal failed");
70/// }
71/// ```
72///
73/// 2. **Deploy the application**: Start the application with the updated event
74/// structure and the `PgIdIndexer` integration. Newly created events with the new
75/// domain identifier will automatically have the identifier indexed.
76///
77/// Once the indexing process is complete, you can query the event store using the
78/// new domain identifier to fetch events.
79///
80/// If indexing is done, you can remove the `PgIdIndexer` from the list of registered event listeners.
81pub struct PgIdIndexer<E: Event + Clone> {
82 id: &'static str,
83 pool: PgPool,
84 query: StreamQuery<PgEventId, E>,
85 _event: PhantomData<E>,
86}
87
88impl<E: Event + Clone> PgIdIndexer<E> {
89 /// Creates a new `PgIdIndexer` instance for indexing events.
90 ///
91 /// # Arguments
92 ///
93 /// * `id` - A unique identifier for the listener, used to store the last processed `event_id` in the database.
94 /// * `pool` - A `PgPool` instance for Postgres.
95 pub fn new(id: &'static str, pool: PgPool) -> Self {
96 Self {
97 id,
98 pool,
99 query: disintegrate::query!(E),
100 _event: PhantomData,
101 }
102 }
103}
104
105/// PostgreSQL Id Indexer error.
106#[derive(thiserror::Error, Debug)]
107#[error(transparent)]
108pub struct Error(#[from] sqlx::Error);
109
110#[async_trait]
111impl<E: Event + Clone + Send + Sync> EventListener<PgEventId, E> for PgIdIndexer<E> {
112 type Error = Error;
113
114 fn id(&self) -> &'static str {
115 self.id
116 }
117
118 fn query(&self) -> &StreamQuery<PgEventId, E> {
119 &self.query
120 }
121
122 async fn handle(&self, event: PersistedEvent<PgEventId, E>) -> Result<(), Self::Error> {
123 let mut query_builder = sql_builder(event.id(), event.domain_identifiers());
124 query_builder.build().execute(&self.pool).await?;
125 Ok(())
126 }
127}
128
129fn sql_builder(
130 event_id: PgEventId,
131 domain_identifiers: DomainIdentifierSet,
132) -> sqlx::QueryBuilder<'static, Postgres> {
133 let domain_identifiers = <BTreeMap<_, _> as Clone>::clone(&domain_identifiers).into_iter();
134 let mut sql_builder = sqlx::QueryBuilder::new("UPDATE event SET ");
135 let mut separated = sql_builder.separated(",");
136 for (id_name, id_value) in domain_identifiers {
137 separated.push(format!("{id_name} = "));
138
139 match id_value {
140 disintegrate::IdentifierValue::String(value) => {
141 separated.push_bind_unseparated(value.clone())
142 }
143 disintegrate::IdentifierValue::i64(value) => separated.push_bind_unseparated(value),
144 disintegrate::IdentifierValue::Uuid(value) => separated.push_bind_unseparated(value),
145 };
146 }
147 separated.push_unseparated(" WHERE event_id = ");
148 separated.push_bind_unseparated(event_id);
149
150 sql_builder
151}
152
153#[cfg(test)]
154mod test {
155 use disintegrate::domain_identifiers;
156 use uuid::Uuid;
157
158 use super::sql_builder;
159
160 #[test]
161 fn it_builds_event_update() {
162 let ids =
163 domain_identifiers! {cart_id: "cart1", product_id: 1, customer_id: Uuid::new_v4()};
164
165 let builder = sql_builder(1, ids);
166
167 assert_eq!(
168 builder.sql(),
169 "UPDATE event SET cart_id = $1,customer_id = $2,product_id = $3 WHERE event_id = $4"
170 );
171 }
172}