indexdb_es/
event_repository.rs

1use crate::{js_event::JsEvent, IndexDBAggregateError};
2use async_trait::async_trait;
3use cqrs_es::persist::{
4    PersistedEventRepository, PersistenceError, ReplayStream, SerializedEvent, SerializedSnapshot,
5};
6use cqrs_es::Aggregate;
7use futures::channel::oneshot::channel;
8use gloo_utils::format::JsValueSerdeExt;
9use idb::*;
10use wasm_bindgen::prelude::*;
11use wasm_bindgen_futures::spawn_local;
12// use futures::SinkExt;
13use serde_json::Value;
14
15/// An event repository relying on a IndexDB database for persistence.
16pub struct IndexDBEventRepository {
17    db_name: String,
18    store_name: String,
19}
20
21#[async_trait]
22impl PersistedEventRepository for IndexDBEventRepository {
23    async fn get_events<A: Aggregate>(
24        &self,
25        aggregate_id: &str,
26    ) -> Result<Vec<SerializedEvent>, PersistenceError> {
27        self.select_events::<A>(aggregate_id).await
28    }
29
30    async fn get_last_events<A: Aggregate>(
31        &self,
32        aggregate_id: &str,
33        last_sequence: usize,
34    ) -> Result<Vec<SerializedEvent>, PersistenceError> {
35        todo!()
36    }
37
38    async fn get_snapshot<A: Aggregate>(
39        &self,
40        aggregate_id: &str,
41    ) -> Result<Option<SerializedSnapshot>, PersistenceError> {
42        todo!()
43    }
44
45    async fn persist<A: Aggregate>(
46        &self,
47        events: &[SerializedEvent],
48        snapshot_update: Option<(String, Value, usize)>,
49    ) -> Result<(), PersistenceError> {
50        match snapshot_update {
51            None => {
52                self.insert_events::<A>(events).await?;
53            }
54            Some((aggregate_id, aggregate, current_snapshot)) => {
55                // if current_snapshot == 1 {
56                //     self.insert::<A>(aggregate, aggregate_id, current_snapshot, events)
57                //         .await?;
58                // } else {
59                //     self.update::<A>(aggregate, aggregate_id, current_snapshot, events)
60                //         .await?;
61                // }
62                todo!()
63            }
64        };
65        Ok(())
66    }
67
68    async fn stream_events<A: Aggregate>(
69        &self,
70        aggregate_id: &str,
71    ) -> Result<ReplayStream, PersistenceError> {
72        todo!()
73    }
74
75    // TODO: aggregate id is unused here, `stream_events` function needs to be broken up
76    async fn stream_all_events<A: Aggregate>(&self) -> Result<ReplayStream, PersistenceError> {
77        todo!()
78    }
79}
80
81impl IndexDBEventRepository {
82    async fn select_events<A: Aggregate>(
83        &self,
84        aggregate_id: &str,
85    ) -> Result<Vec<SerializedEvent>, PersistenceError> {
86        let (sender, receiver) = channel::<Vec<SerializedEvent>>();
87
88        let db_name = self.db_name.clone();
89        let store_name = self.store_name.clone();
90        let aggregate_id = aggregate_id.to_string();
91
92        spawn_local(async move {
93            let db = connect(&db_name).await;
94
95            // Create a transaction in readwrite mode
96            let transaction = db
97                .transaction(&[&store_name], TransactionMode::ReadWrite)
98                .unwrap();
99
100            // Get the object store
101            let store = transaction.object_store(&store_name).unwrap();
102
103            let index = store.index("aggregate_id").unwrap();
104
105            let values = index
106                .get_all(Some(Query::Key(aggregate_id.into())), None)
107                .await
108                .unwrap();
109
110            let events: Vec<SerializedEvent> = values
111                .into_iter()
112                .map(|val| serde_wasm_bindgen::from_value::<JsEvent>(val).unwrap())
113                .map(|js_event| js_event.into())
114                .collect();
115
116            sender.send(events).unwrap();
117        });
118
119        match receiver.await {
120            Ok(result) => Ok(result),
121            Err(_) => Err(PersistenceError::OptimisticLockError),
122        }
123    }
124}
125
126impl IndexDBEventRepository {
127    pub fn new(db_name: Option<String>, store_name: Option<String>) -> Self {
128        Self {
129            db_name: db_name.unwrap_or("cqrs".to_string()),
130            store_name: store_name.unwrap_or("events".to_string()),
131        }
132    }
133
134    pub async fn insert_events<A: Aggregate>(
135        &self,
136        events: &[SerializedEvent],
137    ) -> Result<(), IndexDBAggregateError> {
138        let (sender, receiver) = channel::<Result<(), IndexDBAggregateError>>();
139
140        let db_name = self.db_name.clone();
141        let store_name = self.store_name.clone();
142        let events = events.to_vec();
143
144        spawn_local(async move {
145            let db = connect(&db_name).await;
146            let events: Vec<JsValue> = events
147                .into_iter()
148                .map(|e| JsEvent::from(e.clone()))
149                .map(|e| JsValue::from_serde(&e).unwrap())
150                .collect();
151
152            // Create a transaction in readwrite mode
153            let transaction = db
154                .transaction(&[&store_name], TransactionMode::ReadWrite)
155                .unwrap();
156
157            // Get the object store
158            let store = transaction.object_store(&store_name).unwrap();
159
160            let mut res: Result<(), IndexDBAggregateError> = Ok(());
161            // Add the values to the store
162            for event in events {
163                web_sys::console::log_1(&event);
164                if store.add(&event, None).await.is_err() {
165                    res = Err(IndexDBAggregateError::OptimisticLock);
166                }
167            }
168
169            // Commit the transaction
170            if res.is_ok() {
171                transaction.commit().await.unwrap();
172            }
173
174            sender.send(res).unwrap();
175        });
176
177        match receiver.await {
178            Ok(result) => result,
179            Err(_) => Err(IndexDBAggregateError::OptimisticLock),
180        }
181    }
182}
183
184async fn connect(name: &str) -> Database {
185    // Better error messages in debug mode
186    console_error_panic_hook::set_once();
187
188    // Get a factory instance from global scope
189    let factory = Factory::new().unwrap();
190
191    // Create an open request for the database
192    let mut open_request = factory.open(name, Some(1)).unwrap();
193
194    // Add an upgrade handler for database
195    open_request.on_upgrade_needed(|event| {
196        // Get database instance from event
197        let database = event.database().unwrap();
198
199        // Prepare object store params
200        let mut store_params = ObjectStoreParams::new();
201        store_params.key_path(Some(KeyPath::new_array(vec![
202            "aggregate_type",
203            "aggregate_id",
204            "sequence",
205        ])));
206
207        // Create object store
208        let store = database
209            .create_object_store("events", store_params)
210            .unwrap();
211
212        // Create index on object store
213        store
214            .create_index("aggregate_id", KeyPath::new_single("aggregate_id"), None)
215            .unwrap();
216    });
217
218    open_request.await.unwrap()
219}